Hi Tathagata, > > I'm personally curious about this point. I could investigate by > creating > > a > > simplified test scenario that isolates the data cummulator case and > compare > > the performance of both models (actors vs threads with proper locking) > > under different levels of concurrency. > > Do you think this could be helpful for the project? I'm looking to > > contribute and this could be an interesting starting point. > > > > > Yes! actor vs threads with locking is a great test to do, since for the > kafka (and who know what other sources in future), the block generator has > to support multiple thread ingestion. I think one also needs to compare > with single thread without locking (the current model). If single thread > without locking is the fastest and thread with locking not so bad compared > to actors, then it may be better to leave the ingestion without locks for > maximum throughput for single-thread sources (e.g. Socket, and most others) > and add a lock for multi-thread sources like Kafka.
I spent a good part of the weekend working on this. So far, I've isolated the BlockGenerator with its helper threads and I've built some testing infrastructure to generate records under high load in a controlled way and inject them into the BlockGenerator using the += method. So far, I've compared the performance of the original, non-synchronized version with a simple synchronization around the append method [1]. Here're my findings so far: Test Scenario: BlockGenerator: Synchronized 1 data generator: TimeBoxedSeqDataGen(blockGen,10000000)) produced: Totals: records=430000000, time=31247, rec/ms=13761.32 received: 429999961 // note this Data Generator creates data blocks of the given size, so a sum of blocks should be a multiple of 10000000 diff: 39 Test Scenario: BlockGenerator: Synchronized 5 data generator: TimeBoxedSeqDataGen(blockGen,10000000)) produced: Totals: records=920000000, time=421744, rec/ms=2181.42 received: 919999201 // note this Data Generator creates data blocks of the given size, so a sum of blocks should be a multiple of 10000000 diff: 799 Test Scenario: BlockGenerator: Original (not-synchronized) 5 data generator: TimeBoxedSeqDataGen(blockGen,10000000)) produced: Totals: records=1180000000, time=67062, rec/ms=17595.66 received: 1181328348 diff: -1328348 [1] BlockGenerator synchronized: def += (obj: T) { currentBuffer.synchronized{ currentBuffer += obj } } Three initial findings: 1st - the obvious: the lack of synchronization is an issue. The actual number of records produced is inconsistent with the number of records received. 2nd - the impact of synchronization is high: = 17595.66 rec/ms without sync (current situation), to 13761.32 rec/ms in a single threaded uncontended append (synchronized, only 1 contributing thread), to 2181.42 rec/ms with 5 threads fighting to put some data. These numbers can only be seen relative to each other. (I'm working on a Ubuntu VM, with 4 cores assigned and 6GB of RAM) 3rd - there's also a concurrency issue with the way the blocks are being 'shipped'. Note those records missing even in the synchronized case? Also, there're more records received than created in the last scenario. This is not due to the += operator, but somewhere in the pushBlock mechanism. I didn't get to the root cause of this yet. This last point is of high impact as it would mean that all subclasses of NetworkInputDStream have a data consistency issue. What's next? I'm going to create an actor-based version of the BlockGenerator logic and use the same test infrastructure to compare performance and consistency. I will also put the code in github sothat you can reproduce any findings. (this is mostly weekend-work, so the next chapter will be in a week or two from now) Hope this helps. -kr, Gerard.