Hi Tathagata,

 > > I'm personally curious about this point. I could investigate by
> creating
> > a
> > simplified test scenario that isolates the data cummulator case and
> compare
> > the performance of both models (actors vs threads with proper locking)
> > under different levels of concurrency.
> > Do you think this could be helpful for the project? I'm looking to
> > contribute and this could be an interesting starting point.
> >
> >
> Yes! actor vs threads with locking is a great test to do, since for the
> kafka (and who know what other sources in future), the block generator has
> to support multiple thread ingestion. I think one also needs to compare
> with single thread without locking (the current model). If single thread
> without locking is the fastest and thread with locking not so bad compared
> to actors, then it may be better to leave the ingestion without locks for
> maximum throughput for single-thread sources (e.g. Socket, and most others)
> and add a lock for multi-thread sources like Kafka.


I spent a good part of the weekend working on this. So far, I've isolated
the BlockGenerator with its helper threads and I've built some testing
infrastructure to generate records under high load in a controlled way and
inject them into the BlockGenerator using the += method.

So far, I've compared the performance of the original, non-synchronized
version with a simple synchronization around the append method [1].

Here're my findings so far:

Test Scenario:
BlockGenerator: Synchronized
1 data generator: TimeBoxedSeqDataGen(blockGen,10000000))
produced: Totals: records=430000000, time=31247, rec/ms=13761.32
received: 429999961  // note this Data Generator creates data blocks of the
given size, so a sum of blocks should be a multiple of 10000000
diff: 39

Test Scenario:
BlockGenerator: Synchronized
5 data generator: TimeBoxedSeqDataGen(blockGen,10000000))
produced:  Totals: records=920000000, time=421744, rec/ms=2181.42
received: 919999201 // note this Data Generator creates data blocks of the
given size, so a sum of blocks should be a multiple of 10000000
diff: 799

Test Scenario:
BlockGenerator: Original (not-synchronized)
5 data generator: TimeBoxedSeqDataGen(blockGen,10000000))
produced: Totals: records=1180000000, time=67062, rec/ms=17595.66
received:  1181328348
diff: -1328348

[1] BlockGenerator synchronized:
def += (obj: T) {
    currentBuffer.synchronized{
      currentBuffer += obj
    }
}

Three initial findings:

1st - the obvious: the lack of synchronization is an issue. The actual
number of records produced is inconsistent with the number of records
received.

2nd - the impact of synchronization is high: = 17595.66 rec/ms without sync
(current situation), to 13761.32 rec/ms in a single threaded uncontended
append (synchronized, only 1 contributing thread), to 2181.42 rec/ms with 5
threads fighting to put some data. These numbers can only be seen relative
to each other.  (I'm working on a Ubuntu VM, with 4 cores assigned and 6GB
of RAM)

3rd - there's also a concurrency issue with the way the blocks are being
'shipped'. Note those records missing even in the synchronized case? Also,
there're more records received than created in the last scenario. This is
not due to the += operator, but somewhere in the pushBlock mechanism. I
didn't get to the root cause of this yet.

This last point is of high impact as it would mean that all subclasses
of NetworkInputDStream have a data consistency issue.

What's next?  I'm going to create an actor-based version of the
BlockGenerator logic and use the same test infrastructure to compare
performance and consistency.  I will also put the code in github sothat you
can reproduce any findings.  (this is mostly weekend-work, so the next
chapter will be in a week or two from now)

Hope this helps.

-kr, Gerard.

Reply via email to