Jakob Homan created SAMZA-100:
---------------------------------
Summary: Add a delay while recreating and retrying fetch requests
Key: SAMZA-100
URL: https://issues.apache.org/jira/browse/SAMZA-100
Project: Samza
Issue Type: Bug
Components: container
Affects Versions: 0.6.0
Reporter: Jakob Homan
Fix For: 0.7.0
Currently for any not-handled-elsewhere errors during message fecthing with
Kafka we immediately re-create a new simple consumer and re-try the fetch. For
some types of exception, such as connection refused when the host is gone, this
causes a lot of extra logging and wasted work. It'd be better to add a small
delay (perhaps exponential in its backoff) before re-trying:
{code:title=BrokerProxy.scala} while (!Thread.currentThread.isInterrupted)
{
if (nextOffsets.size == 0) {
debug("No TopicPartitions to fetch. Sleeping.")
Thread.sleep(sleepMSWhileNoTopicPartitions)
} else {
try {
fetchMessages()
} catch {
// If we're interrupted, don't try and reconnect. We should shut
down.
case e: InterruptedException =>
warn("Shutting down due to interrupt exception.")
Thread.currentThread.interrupt
case e: ClosedByInterruptException =>
warn("Shutting down due to closed by interrupt exception.")
Thread.currentThread.interrupt
case e: Throwable => {
warn("Recreating simple consumer and retrying connection")
warn("Stack trace for fetchMessages exception.", e)
simpleConsumer.close()
// <------- Now would be a good time to take a break, maybe?
simpleConsumer = createSimpleConsumer()
metrics.reconnects(host, port).inc
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.1#6144)