[ 
https://issues.apache.org/jira/browse/SAMZA-220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated SAMZA-220:
----------------------------------

    Attachment: SAMZA-220.2.patch
                without-SAMZA-220.2.png
                with-SAMZA-220.2.png

Attaching the diff from my last comment. I'm also attaching two screenshots to 
show the dramatic drop in SystemConsumers.choose CPU usage. You can see that 
after the patch, it actually uses less CPU than the trace() logging call.

I also ran the TestSamzaContainerPerformance against both SAMZA-220.2.patch and 
master.

With the patch (~270,000 msgs/sec):

39752 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - 
Processed 10000000 messages in 37 seconds.

Without the patch (~100,000 msgs/sec):

101555 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - 
Processed 10000000 messages in 90 seconds.

Once I update the tests, I'm going to run this against our large internal job 
to see how it performs.

On another side note, I was actually able to get the container to do 500,000+ 
msgs/sec by doing two more things:

# Commenting out the trace() calls in TaskInstance
# Eliminating the for loops in SamzaContainer's window, send, and commit 
methods.

I'm not including them in this patch because there's already enough going on 
here.

> SystemConsumers is slow when consuming from a large number of partitions
> ------------------------------------------------------------------------
>
>                 Key: SAMZA-220
>                 URL: https://issues.apache.org/jira/browse/SAMZA-220
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>         Attachments: SAMZA-220.0.patch, SAMZA-220.1.patch, SAMZA-220.2.patch, 
> with-SAMZA-220.2.png, without-SAMZA-220.2.png
>
>
> We have observed poor throughput when a SamzaContainer is consuming many 
> partitions (100s). The more partitions, the worse the performance gets.
> When hooking up VisualVM, two operations take up more than 65% of the CPU in 
> SystemConsumers:
> {code}
>     refresh.maybeCall()
>     updateMessageChooser
> {code}
> The problem is that we run each of these operations once before every 
> process() call to a StreamTask. Both of these operations iterate over *all* 
> SystemStreamPartitions that the SystemConsumers is consuming from. If you 
> have hundreds of partitions, it means you do two loops of 100+ items for 
> every message you process. This is true even if the SystemConsumers buffer 
> has a lot of messages (10,000+), and also true even if most 
> systemStreamPartitions have no messages available.
> I have two proposed solutions to this problem:
> 1. Only call refresh.maybeCall() when the total number of buffered messages 
> in the SystemConsumers has dropped below some low watermark.
> 2. Only have updateMessageChooser call messageChooser.update for 
> systemStreamPartitions that actually *have* a message.
> I have implemented this and deployed it on a few jobs, and I am seeing 
> significant performance improvement. From 10k-20k msgs/sec to 50k+.
> The trade off, as I see it is really around (1), which will introduce a 
> little latency for topics that are low volume. In such a case, the time from 
> when a message arrives to when it gets refreshed in the buffer, and updated 
> in the chooser increases.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to