Pramod and I are looking into replacing/enhancing CircularBuffer with SPSC. One challenge is supporting FrozenIterator as SPSC and MPMC do not support iterators. We may also want to deprecate or at minimum stop using UnsafeBlockingQueue as SPSC.size() overestimates actual number of elements in the collection, so we will not be able to use poolUnsafe()/peekUnsafe() even if size() > 0.

On 10/2/15 18:33, Chetan Narsude wrote:
Excellent points about low level performance optimizations in SPSC. A
few things to learn there. Thanks for sharing. I wish I had come across
this material earlier. CircularBuffer is ad hoc knowledge. It misses out on
cache misses in CPU ( would have never guessed that - truly a silent
killer) and secondly lazy set. CB should be enhanced to pull these
optimizations in. Both are very easy once you know what these optimizations
are.

On Wednesday, September 30, 2015, Vlad Rozov <[email protected]>
wrote:

Blog and presentation on algorithms behind JCTools:

http://psy-lob-saw.blogspot.com/p/lock-free-queues.html
https://vimeo.com/100197431

Thank you,

Vlad

On 9/29/15 21:14, Vlad Rozov wrote:

I guess yes, it should show improvement every time there is
consumer/producer contention on a resource from two different threads, so
we should see improvements in the buffer server as well. The current
prototype does not support containers on different nodes.

Thank you,

Vlad

On 9/29/15 20:47, Pramod Immaneni wrote:

Would it show any improvement in the case where the containers are on
different nodes.

On Tue, Sep 29, 2015 at 7:17 PM, Vlad Rozov <[email protected]>
wrote:

By changing QUEUE_CAPACITY to 1200000 I can get around 62 mil tuples for
the case when wordGenerator emits the same tuple and 34 mil when it
generates new tuples each time.

Thank you,

Vlad


On 9/29/15 17:08, Vlad Rozov wrote:

3 mil for container local and 55 mil for thread local.
Thank you,

Vlad



On 9/29/15 16:57, Chetan Narsude wrote:

Vlad, what was the number without this fix?
--
Chetan

On Tue, Sep 29, 2015 at 4:48 PM, Vlad Rozov <[email protected]>
wrote:

I did a quick prototype that uses http://jctools.github.io/JCTools
SPSC

bounded queue instead of CircularBuffer. For container local I now
see
13
mil tuples per second.

Thank you,

Vlad <http://jctools.github.io/JCTools>


On 9/28/15 12:58, Chetan Narsude wrote:

Let me shed some light on THREAD_LOCAL and CONTAINER_LOCAL.

THREAD_LOCAL at the core is nothing but a function call. When an
operator
does emit(tuple), it gets translated in  downstream ports
"process(tuple)"
call which immediately gets invoked in the same thread. So obviously
the
performance is going to be a lot faster. The only thing that's
happening
in
between is setting up the stack and invoking the function.

With CONTAINER_LOCAL - there is a producer thread and a consumer
thread
involved. Producer produces (emit(tuple)) and consumer
consumes(process(tuple)). This scheme is the most optimal when the
rate at
which producer produces is equal to the rate at which consumer
consumes.
Often that's not the case - so we have a bounded memory buffer in
between
(the implementation is CircularBuffer). Now in addition to the
things
that
THREAD_LOCAL does, CONTAINER_LOCAL pattern requires managing the
circular
buffer *and* thread context switch. The most expensive of the thread
context switch is the memory synchronization. As you all have
pointed
out
how expensive it is to use volatile, I need not get into details of
how
expensive memory synchronization can get.

Long story short - no matter which pattern you use, when you use
more
than
1 thread there are certain memory synchronization penalties which
are
unavoidable and slow the things down considerably. In 2012, I had
benchmarked atomic, volatile, synchronized and for the benchmark (I
think
there are unit tests for it), I found volatile to be least
expensive at
that time. Synchronized was not too much behind (it's very efficient
when
the contention is likely to be amongst a single digit number of
threads).
Not sure how those benchmark will look today but you get the idea.

In a data intensive app, most of the time is spent in IO and there
is a
lot
of CPU idling at individual operator so you will not see the
difference
when you change CONTAINER_LOCAL to THREAD_LOCAL yet you will see
some
memory optimization as you are taking away intermediate memory based
buffer
*and* delayed garbage collection of the objects held by this buffer.

Recommendation: Do not bother with these micro optimizations unless
you
notice a problem. Use THREAD_LOCAL for processing
low-throughput/infrequent
streams. Use CONTAINER_LOCAL to avoid serialization/deserialization
of
objects. Leave the rest to the platform. I expect that as it
matures it
will make most of these decisions automatically.

HTH.

--
Chetan

On Mon, Sep 28, 2015 at 11:44 AM, Vlad Rozov <
[email protected]>
wrote:

Hi Tim,

I use benchmark application that is part of Apache Malhar project.
Please
let me know if you need help with compiling or running the
application.

Thank you,

Vlad


On 9/28/15 11:09, Timothy Farkas wrote:

Also sharing a diff



https://github.com/DataTorrent/Netlet/compare/master...ilooner:condVarBuffer

Thanks,
Tim

On Mon, Sep 28, 2015 at 10:07 AM, Timothy Farkas <
[email protected]>
wrote:

Hi Vlad,

Could you share your benchmarking applications? I'd like to test a

change
I made to the Circular Buffer






https://github.com/ilooner/Netlet/blob/condVarBuffer/src/main/java/com/datatorrent/netlet/util/CircularBuffer.java

Thanks,
Tim

On Mon, Sep 28, 2015 at 9:56 AM, Pramod Immaneni <
[email protected]
wrote:

Vlad what was your mode of interaction/ordering between the two
threads

for

the 3rd test.

On Mon, Sep 28, 2015 at 10:51 AM, Vlad Rozov <
[email protected]
wrote:

I created a simple test to check how quickly java can count to

Integer.MAX_INTEGER. The result that I see is consistent with

CONTAINER_LOCAL behavior:

counting long in a single thread: 0.9 sec
counting volatile long in a single thread: 17.7 sec
counting volatile long shared between two threads: 186.3 sec

I suggest that we look into





https://qconsf.com/sf2012/dl/qcon-sanfran-2012/slides/MartinThompson_LockFreeAlgorithmsForUltimatePerformanceMOVEDTOBALLROOMA.pdf

or similar algorithm.

Thank you,
Vlad



On 9/28/15 08:19, Vlad Rozov wrote:

Ram,

The stream between operators in case of CONTAINER_LOCAL is

InlineStream.

InlineStream extends DefaultReservoir that extends
CircularBuffer.

CircularBuffer does not use synchronized methods or locks, it
uses

volatile. I guess that using volatile causes CPU cache
invalidation
and
along with memory locality (in thread local case tuple is
always
local

to

both threads, while in container local case the second
operator
thread
may
see data significantly later after the first thread produced
it)
these
two
factors negatively impact CONTAINER_LOCAL performance. It is
still
quite
surprising that the impact is so significant.

Thank you,

Vlad

On 9/27/15 16:45, Munagala Ramanath wrote:

Vlad,

That's a fascinating and counter-intuitive result. I wonder if

some
internal synchronization is happening
(maybe the stream between them is a shared data structure
that
is
lock
protected) to
slow down the 2 threads in the CONTAINER_LOCAL case. If they
are
both
going as fast as possible
it is likely that they will be frequently blocked by the
lock.
If
that
is indeed the case, some sort of lock
striping or a near-lockless protocol for stream access should
tilt
the
balance in favor of CONTAINER_LOCAL.

In the thread-local case of course there is no need for such
locking.

Ram

On Sun, Sep 27, 2015 at 12:17 PM, Vlad Rozov <
[email protected]
<mailto:[email protected]>> wrote:

         Changed subject to reflect shift of discussion.

         After I recompiled netlet and hardcoded 0 wait time
in
the
         CircularBuffer.put() method, I still see the same
difference
even
         when I increased operator memory to 10 GB and set "-D
dt.application.*.operator.*.attr.SPIN_MILLIS=0 -D
dt.application.*.operator.*.attr.QUEUE_CAPACITY=1024000".
CPU %
         is close to 100% both for thread and container local
locality
         settings. Note that in thread local two operators
share
100%
CPU,
         while in container local each gets its own 100%
load. It
sounds
         that container local will outperform thread local
only
when
         number of emitted tuples is (relatively) low, for
example
when
it
         is CPU costly to produce tuples (hash computations,
         compression/decompression, aggregations, filtering
with
complex
         expressions). In cases where operator may emit 5 or
more
million
         tuples per second, thread local may outperform
container
local
         even when both operators are CPU intensive.




         Thank you,

         Vlad

         On 9/26/15 22:52, Timothy Farkas wrote:

         Hi Vlad,

         I just took a look at the CircularBuffer. Why are
threads

polling
the state
         of the buffer before doing operations? Couldn't
polling
be

avoided

entirely
         by using something like Condition variables to signal
when the

buffer is
         ready for an operation to be performed?

         Tim

         On Sat, Sep 26, 2015 at 10:42 PM, Vlad Rozov<
[email protected]> <mailto:[email protected]>
         wrote:

         After looking at few stack traces I think that in
the
benchmark

         application operators compete for the circular
buffer
that

passes
slices
         from the emitter output to the consumer input and
sleeps
that

avoid busy
         wait are too long for the benchmark operators. I
don't
see
the
stack
         similar to the one below all the time I take the
threads
dump,

but

still
         quite often to suspect that sleep is the root cause.
I'll

recompile with
smaller sleep time and see how this will affect
performance.

         ----
"1/wordGenerator:RandomWordInputModule" prio=10
tid=0x00007f78c8b8c000
         nid=0x780f waiting on condition
[0x00007f78abb17000]
             java.lang.Thread.State: TIMED_WAITING
(sleeping)
              at java.lang.Thread.sleep(Native Method)
              at




com.datatorrent.netlet.util.CircularBuffer.put(CircularBuffer.java:182)


              at

com.datatorrent.stram.stream.InlineStream.put(InlineStream.java:79)

              at
com.datatorrent.stram.stream.MuxStream.put(MuxStream.java:117)

              at




com.datatorrent.api.DefaultOutputPort.emit(DefaultOutputPort.java:48)


              at

com.datatorrent.benchmark.RandomWordInputModule.emitTuples(RandomWordInputModule.java:108)

              at

com.datatorrent.stram.engine.InputNode.run(InputNode.java:115)
              at

com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1377)


         "2/counter:WordCountOperator" prio=10
tid=0x00007f78c8c98800

nid=0x780d
         waiting on condition [0x00007f78abc18000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
              at java.lang.Thread.sleep(Native Method)
              at

com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:519)


              at




com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1377)


         ----

         On 9/26/15 20:59, Amol Kekre wrote:
         A good read -


http://preshing.com/20111118/locks-arent-slow-lock-contention-is/

         Though it does not explain order of magnitude
difference.

         Amol
         On Sat, Sep 26, 2015 at 4:25 PM, Vlad Rozov<
[email protected]> <mailto:[email protected]>
         wrote:

         In the benchmark test THREAD_LOCAL outperforms
CONTAINER_LOCAL

by

an order
         of magnitude and both operators compete for CPU.
I'll

take a

closer look
why.
         Thank you,

         Vlad


         On 9/26/15 14:52, Thomas Weise wrote:

         THREAD_LOCAL - operators share thread

         CONTAINER_LOCAL - each operator has its own
thread

         So as long as operators utilize the CPU
sufficiently

(compete),

the
         latter
         will perform better.
There will be cases where a single thread can
accommodate
multiple
         operators. For example, a socket reader (mostly
waiting
for

IO)

and a
         decompress (CPU hungry) can share a thread.
         But to get back to the original question, stream
locality
does

generally
         not reduce the total memory requirement. If you
add

multiple
operators
         into
         one container, that container will also require
more
memory

and

that's
         how
         the container size is calculated in the physical
plan.

You
may

get some
         extra mileage when multiple operators share the
same

heap
but
the need
         to
         identify the memory requirement per operator
does
not go

away.

         Thomas
         On Sat, Sep 26, 2015 at 12:41 PM, Munagala
Ramanath <

[email protected] <mailto:[email protected]>>
         wrote:

         Would CONTAINER_LOCAL achieve the same thing and
perform a
little better

         on

         a multi-core box ?

Ram
         On Sat, Sep 26, 2015 at 12:18 PM, Sandeep
Deshmukh
<
[email protected] <mailto:
[email protected]>>
         wrote:

         Yes, with this approach only two containers are
required:

one

for stram
         and
another for all operators. You can easily fit around 10
operators in
         less

than 1GB.

On 27 Sep 2015 00:32, "Timothy Farkas"<
[email protected]
<mailto:[email protected]> wrote:

         Hi Ram,

         You could make all the operators thread local.
This
cuts

down

on the
         overhead of separate containers and maximizes
the

memory
available to
each
         operator.

         Tim

On Sat, Sep 26, 2015 at 10:07 AM, Munagala Ramanath <

[email protected] <mailto:[email protected]>

         wrote:

             Hi,

         I was running into memory issues when
deploying my

app
on

the

sandbox
         where all the operators were stuck forever
in
the

PENDING

state

         because
         they were being continually aborted and
restarted

because
of

the
         limited
memory on the sandbox. After some experimentation, I
found
that the

         following config values seem to work:

------------------------------------------

<






https://datatorrent.slack.com/archives/engineering/p1443263607000010

         *<property> <name>dt.attr.MASTER_MEMORY_MB</name>

<value>500</value>
</property> <property>
<name>dt.application. ​ .operator.*
*​ .attr.MEMORY_MB</name> <value>200</value>
</property>

         <property>

<name>dt.application.TopNWordsWithQueries.operator.fileWordCount.attr.MEMORY_MB</name>

               <value>512</value> </property>*

------------------------------------------------
         Are these reasonable values ? Is there a more
systematic
way of

coming

         up

         with these values than trial-and-error ?
Most

of my

operators
-- with
         the
exception of fileWordCount -- need very little
memory;
is
there a way
         to
         cut all values down to the bare minimum and
maximize
available memory
         for
         this one operator ?

         Thanks.

         Ram







Reply via email to