Jakob Homan created SAMZA-100:
---------------------------------

             Summary: Add a delay while recreating and retrying fetch requests
                 Key: SAMZA-100
                 URL: https://issues.apache.org/jira/browse/SAMZA-100
             Project: Samza
          Issue Type: Bug
          Components: container
    Affects Versions: 0.6.0
            Reporter: Jakob Homan
             Fix For: 0.7.0


Currently for any not-handled-elsewhere errors during message fecthing with 
Kafka we immediately re-create a new simple consumer and re-try the fetch.  For 
some types of exception, such as connection refused when the host is gone, this 
causes a lot of extra logging and wasted work. It'd be better to add a small 
delay (perhaps exponential in its backoff) before re-trying:

{code:title=BrokerProxy.scala}      while (!Thread.currentThread.isInterrupted) 
{
        if (nextOffsets.size == 0) {
          debug("No TopicPartitions to fetch. Sleeping.")
          Thread.sleep(sleepMSWhileNoTopicPartitions)
        } else {
          try {
            fetchMessages()
          } catch {
            // If we're interrupted, don't try and reconnect. We should shut 
down.
            case e: InterruptedException =>
              warn("Shutting down due to interrupt exception.")
              Thread.currentThread.interrupt
            case e: ClosedByInterruptException =>
              warn("Shutting down due to closed by interrupt exception.")
              Thread.currentThread.interrupt
            case e: Throwable => {
              warn("Recreating simple consumer and retrying connection")
              warn("Stack trace for fetchMessages exception.", e)
              simpleConsumer.close()
              // <------- Now would be a good time to take a break, maybe?
              simpleConsumer = createSimpleConsumer()
              metrics.reconnects(host, port).inc
            }
          }
{code}



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to