Chris Riccomini created SAMZA-384:
-------------------------------------

             Summary: TaskInstance.send is slow with high task count
                 Key: SAMZA-384
                 URL: https://issues.apache.org/jira/browse/SAMZA-384
             Project: Samza
          Issue Type: Sub-task
          Components: container
    Affects Versions: 0.8.0
            Reporter: Chris Riccomini
            Assignee: Chris Riccomini
             Fix For: 0.8.0


I have a job that runs with ~235 TaskInstances per-container. The behavior that 
I'm seeing is that the SamzaContainer is spending about 20% of its CPU time on 
a trace() logging call inside TaskInstance.send. The code for this method is:

{code}
  def send {
    if (collector.envelopes.size > 0) {
      trace("Sending messages for taskName: %s, %s" format (taskName, 
collector.envelopes.size))

      metrics.sends.inc
      metrics.messagesSent.inc(collector.envelopes.size)

      collector.envelopes.foreach(envelope => 
producerMultiplexer.send(metrics.source, envelope))

      trace("Resetting collector for taskName: %s" format taskName)

      collector.reset
    } else {
      trace("Skipping send for taskName %s because no messages were collected." 
format taskName)

      metrics.sendsSkipped.inc
    }
  }
{code}

This method is invoked from the RunLoop.send method:

{code}
  private def send {
    updateTimer(metrics.sendMs) {
      trace("Triggering send in task instances.")
      metrics.sends.inc
      taskInstances.values.foreach(_.send)
    }
  }
{code}

So, I believe the problem here is that every send() invocation in the RunLoop 
ends up running 235 send() calls on my 235 TaskInstances.

Since the RunLoop doesn't know which TaskInstances actually have messages to 
send, it has to call send() on all of them. I took a look at my metrics, and 
the vast vast vast majority of the time, the TaskInstance.send method is just 
skipping the send call (metrics.sendsSkipped.inc), so this is totally wasted 
time.

The easiest solution here is to remove the trace call when there are no 
outgoing messages in TaskInstance.send.

Another solution would be to modify the RunLoop/TaskInstance in such a way that 
the RunLoop would know which TaskInstances it needs to call send() on, and 
*only* call send() on those TaskInstances. Presumably, this would have to be 
done with a callback or something.

I took a look at the RunLoop, and of the four tight-loop methods (process, 
send, window, and commit), the only one that iterates over all TaskInstances on 
every invocation is send(). The rest are either time-bounded (e.g. once ever 60 
seconds), or only call methods on a single TaskInstance (process). My 
inclination is to just remove this log line in TaskInstance.send then, rather 
than refactoring the code.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to