[ 
https://issues.apache.org/jira/browse/SAMZA-1599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16388064#comment-16388064
 ] 

James Lent commented on SAMZA-1599:
-----------------------------------

 Here is a chart that summarizes some testing I did with a small test job.  The 
details are:
 * 6 Partitions in Kafka Topic.
 * Only one partition had messages and it had 100.
 * Each run restarted at the beginning of the topic.
 * The time reported is the time between when the first and the last messages 
were processed.
 * The _process_ method just counts events and logs the number processed (i.e. 
does basically nothing).
 * There were variations in the times for each scenario tested (I ran it on my 
dev box).  I would therefore not read anything into the small variations 
between "new" and "old".
 * Old refers to the 0.14.1-SNAPSHOT code (as it existed yesterday: 1dfc5ce)
 * New refers to that same code with my patch applied.

 
| | | | | | | | |*Old*|*New*|
| | | |*Got Thread*|*Got Single*|*Process*|*Run Loop*| |*Process*|*Process*|
|*pool.size*|thread.mode| |*Pool Size*|*Thread Mode*|*Thread*|*Mode*| |*Time 
(ms)*|*Time (ms)*|
|not specified|not specified| |0|false|ThreadJob|asynchronous| |14|14|
|not specified|false| |0|false|ThreadJob|asynchronous| |16|15|
|not specified|true| |0|true|ThreadJob|single thread| |12|12|
|1|not specified| |1|false|Container Thread|asynchronous| |1022|16|
|1|false| |1|false|Container Thread|asynchronous| |1023|16|
|1|true| |1|true|ThreadJob|single thread| |13|12|
|6|not specified| |6|false|Container Thread|asynchronous| |1015|21|
|6|false| |6|false|Container Thread|asynchronous| |1005|24|
|6|true| |6|true|ThreadJob|single thread| |11|12|

> AsyncRunLoop Slow When Some Partitions Are Empty
> ------------------------------------------------
>
>                 Key: SAMZA-1599
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1599
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.12.0, 0.14.0
>            Reporter: James Lent
>            Priority: Major
>         Attachments: slow-async-run-loop.patch
>
>
> During performance testing of a relatively simple multi-threaded job we noted 
> that as the Kafka partitions started to exhaust the individual partition 
> tasks that normally processed several messages per ms started to slow down 
> significantly and head towards a throughput of only one message each every 10 
> ms. Investigation indicated that the AsyncRunLoop was often blocking for 10 
> ms looking for new work to choose from the empty partitions and in the 
> process starving the tasks working on partitions with more work to do.
> We found this in a private branch based on 0.12.0 and I have reproduced the 
> issue in master. I coded up a potential fix which works for us in 0.12.0.  I 
> have re-based the fix on master and tested it there too.
> Here is a detailed description of what I think is going on assuming there is 
> only one partition with messages to process:
>  * AsyncRunLoop (main) thread Loop #1
>  ** Chooser selects a message for that partition and places it in the 
> pendingEnvelopeQueue for the AsyncTaskState object associated with that SSP
>  ** Runs the the AsyncTaskWorker associated with that SSP.
>  ** Fetches the first message from the pendingEnvelopeQueue.
>  ** Submits request to the AsyncStreamTaskAdapter to process the first 
> message in a worker thread
>  ** Updates the Chooser with a new message from that SSP (if one is 
> available).
>  * Worker thread
>  ** Starts processing the message
>  * AsyncRunLoop (main) thread Loop #2
>  ** Chooser selects a second message for that partition and places it in the 
> pendingEnvelopeQueue for the AsyncTaskState object associated with that SSP
>  ** Runs the AsyncTaskWorker associated with that SSP:
>  ** The Worker thread is still busy processing the previous message so the 
> AsyncTaskWorker does nothing.
>  ** Message is not "fetched" and therefore the Chooser is NOT updated.
>  * AsyncRunLoop (main) thread Loop #3
>  ** Chooser can not find a message to process.
>  ** Start a poll with a timeout of 10 ms on all the partitions with no 
> messages (this does not include the one partition with messages).
>  * Worker thread
>  ** Completes processing first message
>  ** It updates AsyncTaskWorker state
>  * Here is where the throughput suffers
>  ** AsyncTaskWorker is ready for more work, but, the main thread that hands 
> out the work is stuck "polling"
>  * AsyncRunLoop (main) thread Loop #3 continues
>  ** Wakes up after 10 ms
>  ** Runs the the AsyncTaskWorker associated with that SSP.
>  ** The AsyncTaskWorker is now ready to more work.
>  ** Fetches the second message from the pendingEnvelopeQueue.
>  ** Submits request to the AsyncStreamTaskAdapter to process the second 
> message in a (new) worker thread
>  ** Updates the Chooser with a new message from that SSP (if one is 
> available).
> The key changes in my fix are:
>  * In the AsyncRunLoop (multi-threaded) case don't "poll" when the Chooser 
> returns no messages.  Just "drain" any messages that have already arrived.
>  * Instead in the main loop "wait" when the Chooser returns no messages using 
> the existing synchronization lock that is already used to wait when all the 
> AsyncTaskWorkers are busy.
>  * There is one boolean added to deal with a race condition which could 
> otherwise trigger "waits" we don't need and therefore impact throughput.  It 
> may cause us to occasionally skip a "wait" that would be actually be OK but I 
> think the performance penalty of that is pretty small (one extra spin of the 
> main loop when there is no work to handle).
> The change I made is pretty small, but, the code it is changing is fairly 
> complex.  I would be very interested in:
>  * Getting your insights on the problem I have described. Maybe there is a 
> setting that could work around this behavior.  I could not find one.
>  * Feedback on my proposed solution. Especially potential dangers.
> I have attached a patch file to issue. I can open a merge request if that 
> would be a better way to get your input.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to