[ 
https://issues.apache.org/jira/browse/SAMZA-1599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16388851#comment-16388851
 ] 

Jagadish commented on SAMZA-1599:
---------------------------------

Heya [~jwlent55], this is extremely thorough. Impressive work! Will review this 
PR soon.

> AsyncRunLoop Slow When Some Partitions Are Empty
> ------------------------------------------------
>
>                 Key: SAMZA-1599
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1599
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.12.0, 0.14.0
>            Reporter: James Lent
>            Priority: Major
>         Attachments: slow-async-run-loop.patch
>
>
> During performance testing of a relatively simple multi-threaded job we noted 
> that as the Kafka partitions started to exhaust the individual partition 
> tasks that normally processed several messages per ms started to slow down 
> significantly and head towards a throughput of only one message each every 10 
> ms. Investigation indicated that the AsyncRunLoop was often blocking for 10 
> ms looking for new work to choose from the empty partitions and in the 
> process starving the tasks working on partitions with more work to do.
> We found this in a private branch based on 0.12.0 and I have reproduced the 
> issue in master. I coded up a potential fix which works for us in 0.12.0.  I 
> have re-based the fix on master and tested it there too.
> Here is a detailed description of what I think is going on assuming there is 
> only one partition with messages to process:
>  * AsyncRunLoop (main) thread Loop #1
>  ** Chooser selects a message for that partition and places it in the 
> pendingEnvelopeQueue for the AsyncTaskState object associated with that SSP
>  ** Runs the the AsyncTaskWorker associated with that SSP.
>  ** Fetches the first message from the pendingEnvelopeQueue.
>  ** Submits request to the AsyncStreamTaskAdapter to process the first 
> message in a worker thread
>  ** Updates the Chooser with a new message from that SSP (if one is 
> available).
>  * Worker thread
>  ** Starts processing the message
>  * AsyncRunLoop (main) thread Loop #2
>  ** Chooser selects a second message for that partition and places it in the 
> pendingEnvelopeQueue for the AsyncTaskState object associated with that SSP
>  ** Runs the AsyncTaskWorker associated with that SSP:
>  ** The Worker thread is still busy processing the previous message so the 
> AsyncTaskWorker does nothing.
>  ** Message is not "fetched" and therefore the Chooser is NOT updated.
>  * AsyncRunLoop (main) thread Loop #3
>  ** Chooser can not find a message to process.
>  ** Start a poll with a timeout of 10 ms on all the partitions with no 
> messages (this does not include the one partition with messages).
>  * Worker thread
>  ** Completes processing first message
>  ** It updates AsyncTaskWorker state
>  * Here is where the throughput suffers
>  ** AsyncTaskWorker is ready for more work, but, the main thread that hands 
> out the work is stuck "polling"
>  * AsyncRunLoop (main) thread Loop #3 continues
>  ** Wakes up after 10 ms
>  ** Runs the the AsyncTaskWorker associated with that SSP.
>  ** The AsyncTaskWorker is now ready to more work.
>  ** Fetches the second message from the pendingEnvelopeQueue.
>  ** Submits request to the AsyncStreamTaskAdapter to process the second 
> message in a (new) worker thread
>  ** Updates the Chooser with a new message from that SSP (if one is 
> available).
> The key changes in my fix are:
>  * In the AsyncRunLoop (multi-threaded) case don't "poll" when the Chooser 
> returns no messages.  Just "drain" any messages that have already arrived.
>  * Instead in the main loop "wait" when the Chooser returns no messages using 
> the existing synchronization lock that is already used to wait when all the 
> AsyncTaskWorkers are busy.
>  * There is one boolean added to deal with a race condition which could 
> otherwise trigger "waits" we don't need and therefore impact throughput.  It 
> may cause us to occasionally skip a "wait" that would be actually be OK but I 
> think the performance penalty of that is pretty small (one extra spin of the 
> main loop when there is no work to handle).
> The change I made is pretty small, but, the code it is changing is fairly 
> complex.  I would be very interested in:
>  * Getting your insights on the problem I have described. Maybe there is a 
> setting that could work around this behavior.  I could not find one.
>  * Feedback on my proposed solution. Especially potential dangers.
> I have attached a patch file to issue. I can open a merge request if that 
> would be a better way to get your input.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to