[ 
https://issues.apache.org/jira/browse/SAMZA-245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14071059#comment-14071059
 ] 

Chris Riccomini commented on SAMZA-245:
---------------------------------------

My latest patch has an unprocessedMessages map, which it uses as a buffer. When 
the SystemConsumers class needs more messages for a set of 
SystemStreamPartitions, it asks the underlying SystemConsumer for *all* 
messages for these SystemStreamPartitions. The messages are then buffered in 
the unprocessedMessages map in SystemConsumers, and doled out to the 
MessageChooser over time.

An alternative implementation would be shove all of the buffering logic into 
the SystemConsumer implementations. In this case, you'd have the 
SystemConsumer.poll API look like this:

{code}
List<IncomingMessageEnvelope> poll(Set<SystemStreamPartition> 
systemStreamPartitions, long timeout) throws InterruptedException;
{code}

Rather than having the poll method return all available messages for the 
supplied SystemStreamPartitions, it would only return at most one 
IncomingMessageEnvelope per SystemStreamPartition.

This tweak simplifies the SystemConsumers implementation slightly (LoC goes 
from 315 to 280) because we no longer need an unprocessedMessage buffer in it. 
We simply call SystemConsumer.poll() immediately whenever the MessageChooser 
needs a new message, rather than first taking from the unprocessedMessages 
buffer. It also allows us to eliminate the "update" method.

The trade off for this slightly simpler SystemConsumers implementation is that 
it pushes the buffering complexity into individual SystemConsumer 
implementations. This is a double edged sword. On the one hand, it allows us to 
potentially write different buffering strategies for different systems. I can't 
think of any use cases for this off the top of my head, but there might be 
some. On the other hand, it means that each individual system has its own way 
of configuring its buffer (and buffer refresh rates), which seems like it would 
be confusing to end-users. In addition, it also seems a bit scary, since poll() 
might be high overhead for some system implementations. If we poll aggressively 
every time we need a message, this could lead to bad performance. To get around 
this, again, the underlying system would have to buffer messages to return 
avoid the cost of polling frequently.

Right now, the trade off doesn't matter much, since all known implementations 
use BlockingEnvelopeMap, which already contains the buffering logic. When we 
convert to the [0.9 new Kafka 
consumer|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design],
 this won't be the case anymore. In that case, we'll have to re-implement the 
unprocessedMessages logic inside the Kafka consumer, since the consumer will 
fetch for N messages, but only be able to return one at a time via the poll() 
method.

After implementing a bit of this alternative approach, I don't think the 
tradeoffs are worth it, so I'm sticking with the existing patch. I didn't get 
as far as doing any performance tests with the tweaked patch because I'm not 
comfortable with the tradeoffs.

> Improve SystemConsumers performance
> -----------------------------------
>
>                 Key: SAMZA-245
>                 URL: https://issues.apache.org/jira/browse/SAMZA-245
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.8.0
>
>         Attachments: SAMZA-245-1.patch, SAMZA-245-3.patch, SAMZA-245-4.patch, 
> SAMZA-245.0.patch, 
> org.apache.samza.test.performance.TestSamzaContainerPerformance.SAMZA-245-3.html,
>  org.apache.samza.test.performance.TestSamzaContainerPerformance.master.html
>
>
> As part of SAMZA-220, a more radical patch was proposed. This patch appears 
> to improve SystemConsumers' performance pretty significantly, while also 
> reducing its complexity. The decision was made to move this change into the 
> 0.8.0 release, rather than the 0.7.0 release, since it's a fairly risky 
> change.
> This ticket is to explore updating SystemConsumers to eliminate almost all 
> loops in order to increase performance in the Samza container.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to