3 mil for container local and 55 mil for thread local.

Thank you,

Vlad



On 9/29/15 16:57, Chetan Narsude wrote:
Vlad, what was the number without this fix?

--
Chetan

On Tue, Sep 29, 2015 at 4:48 PM, Vlad Rozov <[email protected]> wrote:

I did a quick prototype that uses http://jctools.github.io/JCTools SPSC
bounded queue instead of CircularBuffer. For container local I now see 13
mil tuples per second.

Thank you,

Vlad <http://jctools.github.io/JCTools>


On 9/28/15 12:58, Chetan Narsude wrote:

Let me shed some light on THREAD_LOCAL and CONTAINER_LOCAL.

THREAD_LOCAL at the core is nothing but a function call. When an operator
does emit(tuple), it gets translated in  downstream ports "process(tuple)"
call which immediately gets invoked in the same thread. So obviously the
performance is going to be a lot faster. The only thing that's happening
in
between is setting up the stack and invoking the function.

With CONTAINER_LOCAL - there is a producer thread and  a consumer thread
involved. Producer produces (emit(tuple)) and consumer
consumes(process(tuple)). This scheme is the most optimal when the rate at
which producer produces is equal to the rate at which consumer consumes.
Often that's not the case - so we have a bounded memory buffer in between
(the implementation is CircularBuffer). Now in addition to the things that
THREAD_LOCAL does, CONTAINER_LOCAL pattern requires managing the circular
buffer *and* thread context switch. The most expensive of the thread
context switch is the memory synchronization. As you all have pointed out
how expensive it is to use volatile, I need not get into details of how
expensive memory synchronization can get.

Long story short - no matter which pattern you use, when you use more than
1 thread there are certain memory synchronization penalties which are
unavoidable and slow the things down considerably. In 2012, I had
benchmarked atomic, volatile, synchronized and for the benchmark (I think
there are unit tests for it), I found volatile to be least expensive at
that time. Synchronized was not too much behind (it's very efficient when
the contention is likely to be amongst a single digit number of threads).
Not sure how those benchmark will look today but you get the idea.

In a data intensive app, most of the time is spent in IO and there is a
lot
of CPU idling at individual operator so you will not see the difference
when you change CONTAINER_LOCAL to THREAD_LOCAL yet you will see some
memory optimization as you are taking away intermediate memory based
buffer
*and* delayed garbage collection of the objects held by this buffer.

Recommendation: Do not bother with these micro optimizations unless you
notice a problem. Use THREAD_LOCAL for processing
low-throughput/infrequent
streams. Use CONTAINER_LOCAL to avoid serialization/deserialization of
objects. Leave the rest to the platform. I expect that as it matures it
will make most of these decisions automatically.

HTH.

--
Chetan

On Mon, Sep 28, 2015 at 11:44 AM, Vlad Rozov <[email protected]>
wrote:

Hi Tim,
I use benchmark application that is part of Apache Malhar project. Please
let me know if you need help with compiling or running the application.

Thank you,

Vlad


On 9/28/15 11:09, Timothy Farkas wrote:

Also sharing a diff


https://github.com/DataTorrent/Netlet/compare/master...ilooner:condVarBuffer

Thanks,
Tim

On Mon, Sep 28, 2015 at 10:07 AM, Timothy Farkas <[email protected]>
wrote:

Hi Vlad,

Could you share your benchmarking applications? I'd like to test a
change
I made to the Circular Buffer




https://github.com/ilooner/Netlet/blob/condVarBuffer/src/main/java/com/datatorrent/netlet/util/CircularBuffer.java

Thanks,
Tim

On Mon, Sep 28, 2015 at 9:56 AM, Pramod Immaneni <
[email protected]
wrote:

Vlad what was your mode of interaction/ordering between the two threads

for
the 3rd test.

On Mon, Sep 28, 2015 at 10:51 AM, Vlad Rozov <[email protected]
wrote:

I created a simple test to check how quickly java can count to

Integer.MAX_INTEGER. The result that I see is consistent with
CONTAINER_LOCAL behavior:

counting long in a single thread: 0.9 sec
counting volatile long in a single thread: 17.7 sec
counting volatile long shared between two threads: 186.3 sec

I suggest that we look into



https://qconsf.com/sf2012/dl/qcon-sanfran-2012/slides/MartinThompson_LockFreeAlgorithmsForUltimatePerformanceMOVEDTOBALLROOMA.pdf

or similar algorithm.
Thank you,

Vlad



On 9/28/15 08:19, Vlad Rozov wrote:

Ram,

The stream between operators in case of CONTAINER_LOCAL is

InlineStream.
InlineStream extends DefaultReservoir that extends CircularBuffer.

CircularBuffer does not use synchronized methods or locks, it uses
volatile. I guess that using volatile causes CPU cache invalidation
and
along with memory locality (in thread local case tuple is always
local

to
both threads, while in container local case the second operator
thread
may
see data significantly later after the first thread produced it)
these
two
factors negatively impact CONTAINER_LOCAL performance. It is still
quite
surprising that the impact is so significant.

Thank you,

Vlad

On 9/27/15 16:45, Munagala Ramanath wrote:

Vlad,

That's a fascinating and counter-intuitive result. I wonder if some
internal synchronization is happening
(maybe the stream between them is a shared data structure that is
lock
protected) to
slow down the 2 threads in the CONTAINER_LOCAL case. If they are
both
going as fast as possible
it is likely that they will be frequently blocked by the lock. If
that
is indeed the case, some sort of lock
striping or a near-lockless protocol for stream access should tilt
the
balance in favor of CONTAINER_LOCAL.

In the thread-local case of course there is no need for such
locking.

Ram

On Sun, Sep 27, 2015 at 12:17 PM, Vlad Rozov <
[email protected]
<mailto:[email protected]>> wrote:

       Changed subject to reflect shift of discussion.

       After I recompiled netlet and hardcoded 0 wait time in the
       CircularBuffer.put() method, I still see the same difference
even
       when I increased operator memory to 10 GB and set "-D
       dt.application.*.operator.*.attr.SPIN_MILLIS=0 -D
       dt.application.*.operator.*.attr.QUEUE_CAPACITY=1024000".
CPU %
       is close to 100% both for thread and container local locality
       settings. Note that in thread local two operators share 100%
CPU,
       while in container local each gets its own 100% load. It
sounds
       that container local will outperform thread local only when
       number of emitted tuples is (relatively) low, for example
when
it
       is CPU costly to produce tuples (hash computations,
       compression/decompression, aggregations, filtering with
complex
       expressions). In cases where operator may emit 5 or more
million
       tuples per second, thread local may outperform container
local
       even when both operators are CPU intensive.




       Thank you,

       Vlad

       On 9/26/15 22:52, Timothy Farkas wrote:

       Hi Vlad,

       I just took a look at the CircularBuffer. Why are threads
polling
the state
       of the buffer before doing operations? Couldn't polling be

avoided
entirely
       by using something like Condition variables to signal when the
buffer is
       ready for an operation to be performed?

       Tim

       On Sat, Sep 26, 2015 at 10:42 PM, Vlad Rozov<
[email protected]> <mailto:[email protected]>
       wrote:

       After looking at few stack traces I think that in the
benchmark

       application operators compete for the circular buffer that
passes
slices
       from the emitter output to the consumer input and sleeps that
avoid busy
       wait are too long for the benchmark operators. I don't see
the
stack
       similar to the one below all the time I take the threads
dump,

but
still
       quite often to suspect that sleep is the root cause. I'll
recompile with
       smaller sleep time and see how this will affect
performance.

       ----
       "1/wordGenerator:RandomWordInputModule" prio=10
tid=0x00007f78c8b8c000
       nid=0x780f waiting on condition [0x00007f78abb17000]
           java.lang.Thread.State: TIMED_WAITING (sleeping)
            at java.lang.Thread.sleep(Native Method)
            at




com.datatorrent.netlet.util.CircularBuffer.put(CircularBuffer.java:182)

            at
com.datatorrent.stram.stream.InlineStream.put(InlineStream.java:79)
            at
com.datatorrent.stram.stream.MuxStream.put(MuxStream.java:117)
            at




com.datatorrent.api.DefaultOutputPort.emit(DefaultOutputPort.java:48)

            at

com.datatorrent.benchmark.RandomWordInputModule.emitTuples(RandomWordInputModule.java:108)

            at
com.datatorrent.stram.engine.InputNode.run(InputNode.java:115)
            at




com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1377)

       "2/counter:WordCountOperator" prio=10 tid=0x00007f78c8c98800
nid=0x780d
       waiting on condition [0x00007f78abc18000]
           java.lang.Thread.State: TIMED_WAITING (sleeping)
            at java.lang.Thread.sleep(Native Method)
            at

com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:519)
            at




com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1377)

       ----
       On 9/26/15 20:59, Amol Kekre wrote:

       A good read -

http://preshing.com/20111118/locks-arent-slow-lock-contention-is/
       Though it does not explain order of magnitude difference.
       Amol

       On Sat, Sep 26, 2015 at 4:25 PM, Vlad Rozov<
[email protected]> <mailto:[email protected]>
       wrote:

       In the benchmark test THREAD_LOCAL outperforms
CONTAINER_LOCAL

by
an order
       of magnitude and both operators compete for CPU. I'll take a
closer look
       why.

       Thank you,

       Vlad


       On 9/26/15 14:52, Thomas Weise wrote:

       THREAD_LOCAL - operators share thread

       CONTAINER_LOCAL - each operator has its own thread

       So as long as operators utilize the CPU sufficiently

(compete),
the
       latter
       will perform better.

       There will be cases where a single thread can
accommodate
multiple
       operators. For example, a socket reader (mostly waiting
for

IO)
and a
       decompress (CPU hungry) can share a thread.
       But to get back to the original question, stream
locality

does
generally
       not reduce the total memory requirement. If you add
multiple
operators
       into
       one container, that container will also require more
memory

and
that's
       how
       the container size is calculated in the physical plan.
You

may
get some
       extra mileage when multiple operators share the same heap
but
the need
       to
       identify the memory requirement per operator does not go

away.
       Thomas
       On Sat, Sep 26, 2015 at 12:41 PM, Munagala Ramanath <
       [email protected] <mailto:[email protected]>>
       wrote:

       Would CONTAINER_LOCAL achieve the same thing and
perform a
little better

       on

       a multi-core box ?
       Ram

       On Sat, Sep 26, 2015 at 12:18 PM, Sandeep Deshmukh <
       [email protected] <mailto:
[email protected]>>
       wrote:

       Yes, with this approach only two containers are
required:

one
for stram
       and
       another for all operators. You can easily fit around 10
operators in

       less

       than 1GB.
       On 27 Sep 2015 00:32, "Timothy Farkas"<
[email protected]
<mailto:[email protected]>  wrote:

       Hi Ram,

       You could make all the operators thread local. This
cuts

down
on the
       overhead of separate containers and maximizes the memory
available to

       each

       operator.

       Tim

       On Sat, Sep 26, 2015 at 10:07 AM, Munagala Ramanath <

       [email protected] <mailto:[email protected]>

       wrote:

           Hi,

       I was running into memory issues when deploying my
app
on

the
       sandbox

       where all the operators were stuck forever in the

PENDING

state
       because

       they were being continually aborted and restarted
because

of
the
       limited
       memory on the sandbox. After some experimentation, I
found
that the

       following config values seem to work:

       ------------------------------------------
       <





https://datatorrent.slack.com/archives/engineering/p1443263607000010

       *<property>    <name>dt.attr.MASTER_MEMORY_MB</name>
       <value>500</value>
           </property>  <property>

<name>dt.application.​.operator.*

       *​.attr.MEMORY_MB</name>    <value>200</value>
</property>

       <property>
<name>dt.application.TopNWordsWithQueries.operator.fileWordCount.attr.MEMORY_MB</name>

             <value>512</value>  </property>*
       ------------------------------------------------
       Are these reasonable values ? Is there a more
systematic

way of

       coming

       up

       with these values than trial-and-error ? Most of my
operators
-- with
       the
       exception of fileWordCount -- need very little
memory;
is
there a way
       to
       cut all values down to the bare minimum and maximize
available memory
       for
       this one operator ?

       Thanks.

       Ram






Reply via email to