[ https://issues.apache.org/jira/browse/SAMZA-1599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16388064#comment-16388064 ]
James Lent commented on SAMZA-1599: ----------------------------------- Here is a chart that summarizes some testing I did with a small test job. The details are: * 6 Partitions in Kafka Topic. * Only one partition had messages and it had 100. * Each run restarted at the beginning of the topic. * The time reported is the time between when the first and the last messages were processed. * The _process_ method just counts events and logs the number processed (i.e. does basically nothing). * There were variations in the times for each scenario tested (I ran it on my dev box). I would therefore not read anything into the small variations between "new" and "old". * Old refers to the 0.14.1-SNAPSHOT code (as it existed yesterday: 1dfc5ce) * New refers to that same code with my patch applied. | | | | | | | | |*Old*|*New*| | | | |*Got Thread*|*Got Single*|*Process*|*Run Loop*| |*Process*|*Process*| |*pool.size*|thread.mode| |*Pool Size*|*Thread Mode*|*Thread*|*Mode*| |*Time (ms)*|*Time (ms)*| |not specified|not specified| |0|false|ThreadJob|asynchronous| |14|14| |not specified|false| |0|false|ThreadJob|asynchronous| |16|15| |not specified|true| |0|true|ThreadJob|single thread| |12|12| |1|not specified| |1|false|Container Thread|asynchronous| |1022|16| |1|false| |1|false|Container Thread|asynchronous| |1023|16| |1|true| |1|true|ThreadJob|single thread| |13|12| |6|not specified| |6|false|Container Thread|asynchronous| |1015|21| |6|false| |6|false|Container Thread|asynchronous| |1005|24| |6|true| |6|true|ThreadJob|single thread| |11|12| > AsyncRunLoop Slow When Some Partitions Are Empty > ------------------------------------------------ > > Key: SAMZA-1599 > URL: https://issues.apache.org/jira/browse/SAMZA-1599 > Project: Samza > Issue Type: Bug > Components: container > Affects Versions: 0.12.0, 0.14.0 > Reporter: James Lent > Priority: Major > Attachments: slow-async-run-loop.patch > > > During performance testing of a relatively simple multi-threaded job we noted > that as the Kafka partitions started to exhaust the individual partition > tasks that normally processed several messages per ms started to slow down > significantly and head towards a throughput of only one message each every 10 > ms. Investigation indicated that the AsyncRunLoop was often blocking for 10 > ms looking for new work to choose from the empty partitions and in the > process starving the tasks working on partitions with more work to do. > We found this in a private branch based on 0.12.0 and I have reproduced the > issue in master. I coded up a potential fix which works for us in 0.12.0. I > have re-based the fix on master and tested it there too. > Here is a detailed description of what I think is going on assuming there is > only one partition with messages to process: > * AsyncRunLoop (main) thread Loop #1 > ** Chooser selects a message for that partition and places it in the > pendingEnvelopeQueue for the AsyncTaskState object associated with that SSP > ** Runs the the AsyncTaskWorker associated with that SSP. > ** Fetches the first message from the pendingEnvelopeQueue. > ** Submits request to the AsyncStreamTaskAdapter to process the first > message in a worker thread > ** Updates the Chooser with a new message from that SSP (if one is > available). > * Worker thread > ** Starts processing the message > * AsyncRunLoop (main) thread Loop #2 > ** Chooser selects a second message for that partition and places it in the > pendingEnvelopeQueue for the AsyncTaskState object associated with that SSP > ** Runs the AsyncTaskWorker associated with that SSP: > ** The Worker thread is still busy processing the previous message so the > AsyncTaskWorker does nothing. > ** Message is not "fetched" and therefore the Chooser is NOT updated. > * AsyncRunLoop (main) thread Loop #3 > ** Chooser can not find a message to process. > ** Start a poll with a timeout of 10 ms on all the partitions with no > messages (this does not include the one partition with messages). > * Worker thread > ** Completes processing first message > ** It updates AsyncTaskWorker state > * Here is where the throughput suffers > ** AsyncTaskWorker is ready for more work, but, the main thread that hands > out the work is stuck "polling" > * AsyncRunLoop (main) thread Loop #3 continues > ** Wakes up after 10 ms > ** Runs the the AsyncTaskWorker associated with that SSP. > ** The AsyncTaskWorker is now ready to more work. > ** Fetches the second message from the pendingEnvelopeQueue. > ** Submits request to the AsyncStreamTaskAdapter to process the second > message in a (new) worker thread > ** Updates the Chooser with a new message from that SSP (if one is > available). > The key changes in my fix are: > * In the AsyncRunLoop (multi-threaded) case don't "poll" when the Chooser > returns no messages. Just "drain" any messages that have already arrived. > * Instead in the main loop "wait" when the Chooser returns no messages using > the existing synchronization lock that is already used to wait when all the > AsyncTaskWorkers are busy. > * There is one boolean added to deal with a race condition which could > otherwise trigger "waits" we don't need and therefore impact throughput. It > may cause us to occasionally skip a "wait" that would be actually be OK but I > think the performance penalty of that is pretty small (one extra spin of the > main loop when there is no work to handle). > The change I made is pretty small, but, the code it is changing is fairly > complex. I would be very interested in: > * Getting your insights on the problem I have described. Maybe there is a > setting that could work around this behavior. I could not find one. > * Feedback on my proposed solution. Especially potential dangers. > I have attached a patch file to issue. I can open a merge request if that > would be a better way to get your input. -- This message was sent by Atlassian JIRA (v7.6.3#76005)