[
https://issues.apache.org/jira/browse/SAMZA-245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14071059#comment-14071059
]
Chris Riccomini commented on SAMZA-245:
---------------------------------------
My latest patch has an unprocessedMessages map, which it uses as a buffer. When
the SystemConsumers class needs more messages for a set of
SystemStreamPartitions, it asks the underlying SystemConsumer for *all*
messages for these SystemStreamPartitions. The messages are then buffered in
the unprocessedMessages map in SystemConsumers, and doled out to the
MessageChooser over time.
An alternative implementation would be shove all of the buffering logic into
the SystemConsumer implementations. In this case, you'd have the
SystemConsumer.poll API look like this:
{code}
List<IncomingMessageEnvelope> poll(Set<SystemStreamPartition>
systemStreamPartitions, long timeout) throws InterruptedException;
{code}
Rather than having the poll method return all available messages for the
supplied SystemStreamPartitions, it would only return at most one
IncomingMessageEnvelope per SystemStreamPartition.
This tweak simplifies the SystemConsumers implementation slightly (LoC goes
from 315 to 280) because we no longer need an unprocessedMessage buffer in it.
We simply call SystemConsumer.poll() immediately whenever the MessageChooser
needs a new message, rather than first taking from the unprocessedMessages
buffer. It also allows us to eliminate the "update" method.
The trade off for this slightly simpler SystemConsumers implementation is that
it pushes the buffering complexity into individual SystemConsumer
implementations. This is a double edged sword. On the one hand, it allows us to
potentially write different buffering strategies for different systems. I can't
think of any use cases for this off the top of my head, but there might be
some. On the other hand, it means that each individual system has its own way
of configuring its buffer (and buffer refresh rates), which seems like it would
be confusing to end-users. In addition, it also seems a bit scary, since poll()
might be high overhead for some system implementations. If we poll aggressively
every time we need a message, this could lead to bad performance. To get around
this, again, the underlying system would have to buffer messages to return
avoid the cost of polling frequently.
Right now, the trade off doesn't matter much, since all known implementations
use BlockingEnvelopeMap, which already contains the buffering logic. When we
convert to the [0.9 new Kafka
consumer|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design],
this won't be the case anymore. In that case, we'll have to re-implement the
unprocessedMessages logic inside the Kafka consumer, since the consumer will
fetch for N messages, but only be able to return one at a time via the poll()
method.
After implementing a bit of this alternative approach, I don't think the
tradeoffs are worth it, so I'm sticking with the existing patch. I didn't get
as far as doing any performance tests with the tweaked patch because I'm not
comfortable with the tradeoffs.
> Improve SystemConsumers performance
> -----------------------------------
>
> Key: SAMZA-245
> URL: https://issues.apache.org/jira/browse/SAMZA-245
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Fix For: 0.8.0
>
> Attachments: SAMZA-245-1.patch, SAMZA-245-3.patch, SAMZA-245-4.patch,
> SAMZA-245.0.patch,
> org.apache.samza.test.performance.TestSamzaContainerPerformance.SAMZA-245-3.html,
> org.apache.samza.test.performance.TestSamzaContainerPerformance.master.html
>
>
> As part of SAMZA-220, a more radical patch was proposed. This patch appears
> to improve SystemConsumers' performance pretty significantly, while also
> reducing its complexity. The decision was made to move this change into the
> 0.8.0 release, rather than the 0.7.0 release, since it's a fairly risky
> change.
> This ticket is to explore updating SystemConsumers to eliminate almost all
> loops in order to increase performance in the Samza container.
--
This message was sent by Atlassian JIRA
(v6.2#6252)