[
https://issues.apache.org/jira/browse/SAMZA-1599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xinyu Liu updated SAMZA-1599:
-----------------------------
Fix Version/s: (was: 0.15.0)
0.14.1
> AsyncRunLoop Slow When Some Partitions Are Empty
> ------------------------------------------------
>
> Key: SAMZA-1599
> URL: https://issues.apache.org/jira/browse/SAMZA-1599
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.12.0, 0.14.0
> Reporter: James Lent
> Assignee: James Lent
> Priority: Major
> Fix For: 0.14.1
>
> Attachments: slow-async-run-loop.patch
>
>
> During performance testing of a relatively simple multi-threaded job we noted
> that as the Kafka partitions started to exhaust the individual partition
> tasks that normally processed several messages per ms started to slow down
> significantly and head towards a throughput of only one message each every 10
> ms. Investigation indicated that the AsyncRunLoop was often blocking for 10
> ms looking for new work to choose from the empty partitions and in the
> process starving the tasks working on partitions with more work to do.
> We found this in a private branch based on 0.12.0 and I have reproduced the
> issue in master. I coded up a potential fix which works for us in 0.12.0. I
> have re-based the fix on master and tested it there too.
> Here is a detailed description of what I think is going on assuming there is
> only one partition with messages to process:
> * AsyncRunLoop (main) thread Loop #1
> ** Chooser selects a message for that partition and places it in the
> pendingEnvelopeQueue for the AsyncTaskState object associated with that SSP
> ** Runs the the AsyncTaskWorker associated with that SSP.
> ** Fetches the first message from the pendingEnvelopeQueue.
> ** Submits request to the AsyncStreamTaskAdapter to process the first
> message in a worker thread
> ** Updates the Chooser with a new message from that SSP (if one is
> available).
> * Worker thread
> ** Starts processing the message
> * AsyncRunLoop (main) thread Loop #2
> ** Chooser selects a second message for that partition and places it in the
> pendingEnvelopeQueue for the AsyncTaskState object associated with that SSP
> ** Runs the AsyncTaskWorker associated with that SSP:
> ** The Worker thread is still busy processing the previous message so the
> AsyncTaskWorker does nothing.
> ** Message is not "fetched" and therefore the Chooser is NOT updated.
> * AsyncRunLoop (main) thread Loop #3
> ** Chooser can not find a message to process.
> ** Start a poll with a timeout of 10 ms on all the partitions with no
> messages (this does not include the one partition with messages).
> * Worker thread
> ** Completes processing first message
> ** It updates AsyncTaskWorker state
> * Here is where the throughput suffers
> ** AsyncTaskWorker is ready for more work, but, the main thread that hands
> out the work is stuck "polling"
> * AsyncRunLoop (main) thread Loop #3 continues
> ** Wakes up after 10 ms
> ** Runs the the AsyncTaskWorker associated with that SSP.
> ** The AsyncTaskWorker is now ready to more work.
> ** Fetches the second message from the pendingEnvelopeQueue.
> ** Submits request to the AsyncStreamTaskAdapter to process the second
> message in a (new) worker thread
> ** Updates the Chooser with a new message from that SSP (if one is
> available).
> The key changes in my fix are:
> * In the AsyncRunLoop (multi-threaded) case don't "poll" when the Chooser
> returns no messages. Just "drain" any messages that have already arrived.
> * Instead in the main loop "wait" when the Chooser returns no messages using
> the existing synchronization lock that is already used to wait when all the
> AsyncTaskWorkers are busy.
> * There is one boolean added to deal with a race condition which could
> otherwise trigger "waits" we don't need and therefore impact throughput. It
> may cause us to occasionally skip a "wait" that would be actually be OK but I
> think the performance penalty of that is pretty small (one extra spin of the
> main loop when there is no work to handle).
> The change I made is pretty small, but, the code it is changing is fairly
> complex. I would be very interested in:
> * Getting your insights on the problem I have described. Maybe there is a
> setting that could work around this behavior. I could not find one.
> * Feedback on my proposed solution. Especially potential dangers.
> I have attached a patch file to issue. I can open a merge request if that
> would be a better way to get your input.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)