[ 
https://issues.apache.org/jira/browse/SAMZA-100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13838482#comment-13838482
 ] 

Jakob Homan commented on SAMZA-100:
-----------------------------------

This presupposes that retrying is the right approach when dealing with 
connection refused rather than just abdicating for all the topics and letting 
Kafka's failover do its stuff.  However, that would require more sophisticated 
cluster constituency tracking code than we have so that we can re-grow the 
cluster as well as shrink it, when the nodes (or their replacements) show up.

> Add a delay while recreating and retrying fetch requests
> --------------------------------------------------------
>
>                 Key: SAMZA-100
>                 URL: https://issues.apache.org/jira/browse/SAMZA-100
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Jakob Homan
>             Fix For: 0.7.0
>
>
> Currently for any not-handled-elsewhere errors during message fecthing with 
> Kafka we immediately re-create a new simple consumer and re-try the fetch.  
> For some types of exception, such as connection refused when the host is 
> gone, this causes a lot of extra logging and wasted work. It'd be better to 
> add a small delay (perhaps exponential in its backoff) before re-trying:
> {code:title=BrokerProxy.scala}      while 
> (!Thread.currentThread.isInterrupted) {
>         if (nextOffsets.size == 0) {
>           debug("No TopicPartitions to fetch. Sleeping.")
>           Thread.sleep(sleepMSWhileNoTopicPartitions)
>         } else {
>           try {
>             fetchMessages()
>           } catch {
>             // If we're interrupted, don't try and reconnect. We should shut 
> down.
>             case e: InterruptedException =>
>               warn("Shutting down due to interrupt exception.")
>               Thread.currentThread.interrupt
>             case e: ClosedByInterruptException =>
>               warn("Shutting down due to closed by interrupt exception.")
>               Thread.currentThread.interrupt
>             case e: Throwable => {
>               warn("Recreating simple consumer and retrying connection")
>               warn("Stack trace for fetchMessages exception.", e)
>               simpleConsumer.close()
>               // <------- Now would be a good time to take a break, maybe?
>               simpleConsumer = createSimpleConsumer()
>               metrics.reconnects(host, port).inc
>             }
>           }
> {code}



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to