[
https://issues.apache.org/jira/browse/SAMZA-384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14101441#comment-14101441
]
Chris Riccomini commented on SAMZA-384:
---------------------------------------
[~closeuris], yah, exactly. Essentially, rather than delaying sending the
message to the producer multiplexer until the RunLoop.send method, we just send
immediately. This means we can fully remove RunLoop.send, and should also drop
our memory pressure slightly, since the message will immediately be put into
the producer multiplexer, rather than waiting in a buffer temporarily.
> TaskInstance.send is slow with high task count
> ----------------------------------------------
>
> Key: SAMZA-384
> URL: https://issues.apache.org/jira/browse/SAMZA-384
> Project: Samza
> Issue Type: Sub-task
> Components: container
> Affects Versions: 0.8.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Fix For: 0.8.0
>
> Attachments: SAMZA-384-0.patch
>
>
> I have a job that runs with ~235 TaskInstances per-container. The behavior
> that I'm seeing is that the SamzaContainer is spending about 20% of its CPU
> time on a trace() logging call inside TaskInstance.send. The code for this
> method is:
> {code}
> def send {
> if (collector.envelopes.size > 0) {
> trace("Sending messages for taskName: %s, %s" format (taskName,
> collector.envelopes.size))
> metrics.sends.inc
> metrics.messagesSent.inc(collector.envelopes.size)
> collector.envelopes.foreach(envelope =>
> producerMultiplexer.send(metrics.source, envelope))
> trace("Resetting collector for taskName: %s" format taskName)
> collector.reset
> } else {
> trace("Skipping send for taskName %s because no messages were
> collected." format taskName)
> metrics.sendsSkipped.inc
> }
> }
> {code}
> This method is invoked from the RunLoop.send method:
> {code}
> private def send {
> updateTimer(metrics.sendMs) {
> trace("Triggering send in task instances.")
> metrics.sends.inc
> taskInstances.values.foreach(_.send)
> }
> }
> {code}
> So, I believe the problem here is that every send() invocation in the RunLoop
> ends up running 235 send() calls on my 235 TaskInstances.
> Since the RunLoop doesn't know which TaskInstances actually have messages to
> send, it has to call send() on all of them. I took a look at my metrics, and
> the vast vast vast majority of the time, the TaskInstance.send method is just
> skipping the send call (metrics.sendsSkipped.inc), so this is totally wasted
> time.
> The easiest solution here is to remove the trace call when there are no
> outgoing messages in TaskInstance.send.
> Another solution would be to modify the RunLoop/TaskInstance in such a way
> that the RunLoop would know which TaskInstances it needs to call send() on,
> and *only* call send() on those TaskInstances. Presumably, this would have to
> be done with a callback or something.
> I took a look at the RunLoop, and of the four tight-loop methods (process,
> send, window, and commit), the only one that iterates over all TaskInstances
> on every invocation is send(). The rest are either time-bounded (e.g. once
> ever 60 seconds), or only call methods on a single TaskInstance (process). My
> inclination is to just remove this log line in TaskInstance.send then, rather
> than refactoring the code.
--
This message was sent by Atlassian JIRA
(v6.2#6252)