[ 
https://issues.apache.org/jira/browse/SAMZA-384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14101491#comment-14101491
 ] 

Chris Riccomini commented on SAMZA-384:
---------------------------------------

You know, I wrote that, and I honestly can't remember. It's very very very old 
code. I *think* at the time that we didn't know what we were going to be doing 
for exactly-once messaging, and I was being conservative, and not flushing 
unless it was in the SamzaContainer code (before RunLoop existed). It seems not 
required anymore, though. :)

> TaskInstance.send is slow with high task count
> ----------------------------------------------
>
>                 Key: SAMZA-384
>                 URL: https://issues.apache.org/jira/browse/SAMZA-384
>             Project: Samza
>          Issue Type: Sub-task
>          Components: container
>    Affects Versions: 0.8.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.8.0
>
>         Attachments: SAMZA-384-0.patch
>
>
> I have a job that runs with ~235 TaskInstances per-container. The behavior 
> that I'm seeing is that the SamzaContainer is spending about 20% of its CPU 
> time on a trace() logging call inside TaskInstance.send. The code for this 
> method is:
> {code}
>   def send {
>     if (collector.envelopes.size > 0) {
>       trace("Sending messages for taskName: %s, %s" format (taskName, 
> collector.envelopes.size))
>       metrics.sends.inc
>       metrics.messagesSent.inc(collector.envelopes.size)
>       collector.envelopes.foreach(envelope => 
> producerMultiplexer.send(metrics.source, envelope))
>       trace("Resetting collector for taskName: %s" format taskName)
>       collector.reset
>     } else {
>       trace("Skipping send for taskName %s because no messages were 
> collected." format taskName)
>       metrics.sendsSkipped.inc
>     }
>   }
> {code}
> This method is invoked from the RunLoop.send method:
> {code}
>   private def send {
>     updateTimer(metrics.sendMs) {
>       trace("Triggering send in task instances.")
>       metrics.sends.inc
>       taskInstances.values.foreach(_.send)
>     }
>   }
> {code}
> So, I believe the problem here is that every send() invocation in the RunLoop 
> ends up running 235 send() calls on my 235 TaskInstances.
> Since the RunLoop doesn't know which TaskInstances actually have messages to 
> send, it has to call send() on all of them. I took a look at my metrics, and 
> the vast vast vast majority of the time, the TaskInstance.send method is just 
> skipping the send call (metrics.sendsSkipped.inc), so this is totally wasted 
> time.
> The easiest solution here is to remove the trace call when there are no 
> outgoing messages in TaskInstance.send.
> Another solution would be to modify the RunLoop/TaskInstance in such a way 
> that the RunLoop would know which TaskInstances it needs to call send() on, 
> and *only* call send() on those TaskInstances. Presumably, this would have to 
> be done with a callback or something.
> I took a look at the RunLoop, and of the four tight-loop methods (process, 
> send, window, and commit), the only one that iterates over all TaskInstances 
> on every invocation is send(). The rest are either time-bounded (e.g. once 
> ever 60 seconds), or only call methods on a single TaskInstance (process). My 
> inclination is to just remove this log line in TaskInstance.send then, rather 
> than refactoring the code.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to