Repository: incubator-samza Updated Branches: refs/heads/master df6e11afc -> f2fcb26a3
SAMZA-174: General-purpose implementation of a retry loop. Reviewed by Chris Riccomini. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/f2fcb26a Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/f2fcb26a Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/f2fcb26a Branch: refs/heads/master Commit: f2fcb26a33b59da58a852fe81564d60407aa758c Parents: df6e11a Author: Martin Kleppmann <[email protected]> Authored: Fri Mar 7 16:17:38 2014 +0000 Committer: Martin Kleppmann <[email protected]> Committed: Thu Mar 20 23:43:45 2014 +0000 ---------------------------------------------------------------------- .../samza/util/ExponentialSleepStrategy.scala | 130 +++++++++- .../util/TestExponentialSleepStrategy.scala | 161 ++++++++++-- .../kafka/KafkaCheckpointManager.scala | 248 +++++++++---------- .../apache/samza/system/kafka/BrokerProxy.scala | 59 ++--- .../samza/system/kafka/KafkaSystemAdmin.scala | 85 +++---- .../system/kafka/KafkaSystemConsumer.scala | 31 +-- .../samza/system/kafka/KafkaSystemFactory.scala | 5 +- .../system/kafka/KafkaSystemProducer.scala | 42 ++-- .../system/kafka/TestKafkaSystemAdmin.scala | 19 +- .../system/kafka/TestKafkaSystemProducer.scala | 16 +- 10 files changed, 482 insertions(+), 314 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala index b3c9263..376b277 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala @@ -21,6 +21,19 @@ package org.apache.samza.util +import java.nio.channels.ClosedByInterruptException +import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop + +/** + * Encapsulates the pattern of retrying an operation until it succeeds. + * Before every retry there is a delay, which starts short and gets exponentially + * longer on each retry, up to a configurable maximum. There is no limit to the + * number of retries. + * + * @param backOffMultiplier The factor by which the delay increases on each retry. + * @param initialDelayMs Time in milliseconds to wait after the first attempt failed. + * @param maximumDelayMs Cap up to which we will increase the delay. + */ class ExponentialSleepStrategy( backOffMultiplier: Double = 2.0, initialDelayMs: Long = 100, @@ -30,16 +43,117 @@ class ExponentialSleepStrategy( require(initialDelayMs > 0, "initialDelayMs must be positive") require(maximumDelayMs >= initialDelayMs, "maximumDelayMs must be >= initialDelayMs") - var previousDelay = 0L - - def sleep() = { - val nextDelay = getNextDelay(previousDelay) - Thread.sleep(nextDelay) - previousDelay = nextDelay - } - + /** + * Given the delay before the last retry, calculate what the delay before the + * next retry should be. + */ def getNextDelay(previousDelay: Long): Long = { val nextDelay = (previousDelay * backOffMultiplier).asInstanceOf[Long] math.min(math.max(initialDelayMs, nextDelay), maximumDelayMs) } + + /** Can be overridden by subclasses to customize looping behavior. */ + def startLoop: RetryLoop = new RetryLoopState + + /** + * Starts a retryable operation with the delay properties that were configured + * when the object was created. Every call to run is independent, so the same + * ExponentialSleepStrategy object can be used for several different retry loops. + * + * loopOperation is called on every attempt, and given as parameter a RetryLoop + * object. By default it is assumed that the operation failed. If the operation + * succeeded, you must call <code>done</code> on the RetryLoop object to indicate + * success. This method returns the return value of the successful loopOperation. + * + * If an exception is thrown during the execution of loopOperation, the onException + * handler is called. You can choose to re-throw the exception (so that it aborts + * the run loop and bubbles up), or ignore it (the operation will be retried), + * or call <code>done</code> (give up, don't retry). + * + * @param loopOperation The operation that should be attempted and may fail. + * @param onException Handler function that determines what to do with an exception. + * @return If loopOperation succeeded, an option containing the return value of + * the last invocation. If done was called in the exception hander, None. + */ + def run[A](loopOperation: RetryLoop => A, onException: (Exception, RetryLoop) => Unit): Option[A] = { + val loop = startLoop + while (!loop.isDone && !Thread.currentThread.isInterrupted) { + try { + val result = loopOperation(loop) + if (loop.isDone) return Some(result) + } catch { + case e: InterruptedException => throw e + case e: ClosedByInterruptException => throw e + case e: Exception => onException(e, loop) + } + if (!loop.isDone && !Thread.currentThread.isInterrupted) loop.sleep + } + None + } + + private[util] class RetryLoopState extends RetryLoop { + var previousDelay = 0L + var isDone = false + var sleepCount = 0 + + def sleep { + sleepCount += 1 + val nextDelay = getNextDelay(previousDelay) + previousDelay = nextDelay + Thread.sleep(nextDelay) + } + + def reset { + previousDelay = 0 + isDone = false + } + + def done { + isDone = true + } + } +} + +object ExponentialSleepStrategy { + /** + * State of the retry loop, passed to every invocation of the loopOperation + * or the exception handler. + */ + trait RetryLoop { + /** Let the current thread sleep for the backoff time (called by run method). */ + def sleep + + /** Tell the retry loop to revert to initialDelayMs for the next retry. */ + def reset + + /** Tell the retry loop to stop trying (success or giving up). */ + def done + + /** Check whether <code>done</code> was called (used by the run method). */ + def isDone: Boolean + + /** Returns the number of times that the retry loop has called <code>sleep</code>. */ + def sleepCount: Int + } + + /** For tests using ExponentialSleepStrategy.Mock */ + class CallLimitReached extends Exception + + /** + * For writing tests of retryable code. Doesn't actually sleep, so that tests + * are quick to run. + * + * @param maxCalls The maximum number of retries to allow before throwing CallLimitReached. + */ + class Mock(maxCalls: Int) extends ExponentialSleepStrategy { + override def startLoop = new MockRetryLoop + + class MockRetryLoop extends RetryLoop { + var isDone = false + var sleepCount = 0 + def sleep { sleepCount += 1; if (sleepCount > maxCalls) throw new CallLimitReached } + def reset { isDone = false } + def done { isDone = true } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala index 3036da9..6cea6a2 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala @@ -23,34 +23,143 @@ package org.apache.samza.util import org.junit.Assert._ import org.junit.Test +import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop class TestExponentialSleepStrategy { - @Test def testGetNextDelayReturnsIncrementalDelay() = { - val st = new ExponentialSleepStrategy - var nextDelay = st.getNextDelay(0L) - assertEquals(nextDelay, 100L) - nextDelay = st.getNextDelay(nextDelay) - assertEquals(nextDelay, 200L) - nextDelay = st.getNextDelay(nextDelay) - assertEquals(nextDelay, 400L) - } - - @Test def testGetNextDelayReturnsMaximumDelayWhenDelayCapReached() = { - val st = new ExponentialSleepStrategy - var nextDelay = st.getNextDelay(6400L) - assertEquals(nextDelay, 10000L) - nextDelay = st.getNextDelay(nextDelay) - assertEquals(nextDelay, 10000L) - } - - @Test def testSleepStrategyIsConfigurable() = { - val st = new ExponentialSleepStrategy(backOffMultiplier = 3.0, initialDelayMs = 10) - var nextDelay = st.getNextDelay(0L) - assertEquals(nextDelay, 10L) - nextDelay = st.getNextDelay(nextDelay) - assertEquals(nextDelay, 30L) - nextDelay = st.getNextDelay(nextDelay) - assertEquals(nextDelay, 90L) + @Test def testGetNextDelayReturnsIncrementalDelay { + val strategy = new ExponentialSleepStrategy + assertEquals(100, strategy.getNextDelay(0)) + assertEquals(200, strategy.getNextDelay(100)) + assertEquals(400, strategy.getNextDelay(200)) + assertEquals(800, strategy.getNextDelay(400)) + } + + @Test def testGetNextDelayReturnsMaximumDelayWhenDelayCapReached { + val strategy = new ExponentialSleepStrategy + assertEquals(10000, strategy.getNextDelay(6400)) + assertEquals(10000, strategy.getNextDelay(10000)) + } + + @Test def testSleepStrategyIsConfigurable { + val strategy = new ExponentialSleepStrategy(backOffMultiplier = 3.0, initialDelayMs = 10) + assertEquals(10, strategy.getNextDelay(0)) + assertEquals(30, strategy.getNextDelay(10)) + assertEquals(90, strategy.getNextDelay(30)) + assertEquals(270, strategy.getNextDelay(90)) + } + + @Test def testResetToInitialDelay { + val strategy = new ExponentialSleepStrategy + val loop = strategy.startLoop.asInstanceOf[ExponentialSleepStrategy#RetryLoopState] + loop.previousDelay = strategy.getNextDelay(loop.previousDelay) + assertEquals(100, loop.previousDelay) + loop.previousDelay = strategy.getNextDelay(loop.previousDelay) + loop.previousDelay = strategy.getNextDelay(loop.previousDelay) + assertEquals(400, loop.previousDelay) + loop.reset + loop.previousDelay = strategy.getNextDelay(loop.previousDelay) + assertEquals(100, loop.previousDelay) + } + + @Test def testRetryWithoutException { + val strategy = new ExponentialSleepStrategy(initialDelayMs = 1) + var iterations = 0 + var loopObject: RetryLoop = null + val result = strategy.run( + loop => { + loopObject = loop + iterations += 1 + if (iterations == 3) loop.done + iterations + }, + (exception, loop) => throw exception + ) + assertEquals(Some(3), result) + assertEquals(3, iterations) + assertEquals(2, loopObject.sleepCount) + } + + @Test def testRetryWithException { + val strategy = new ExponentialSleepStrategy(initialDelayMs = 1) + var iterations = 0 + var loopObject: RetryLoop = null + strategy.run( + loop => { throw new IllegalArgumentException("boom") }, + (exception, loop) => { + assertEquals("boom", exception.getMessage) + loopObject = loop + iterations += 1 + if (iterations == 3) loop.done + } + ) + assertEquals(3, iterations) + assertEquals(2, loopObject.sleepCount) + } + + @Test def testReThrowingException { + val strategy = new ExponentialSleepStrategy(initialDelayMs = 1) + var iterations = 0 + var loopObject: RetryLoop = null + try { + strategy.run( + loop => { + loopObject = loop + iterations += 1 + throw new IllegalArgumentException("boom") + }, + (exception, loop) => throw exception + ) + fail("expected exception to be thrown") + } catch { + case e: IllegalArgumentException => assertEquals("boom", e.getMessage) + case e: Throwable => throw e + } + assertEquals(1, iterations) + assertEquals(0, loopObject.sleepCount) + } + + def interruptedThread(operation: => Unit) = { + var exception: Option[Throwable] = None + val interruptee = new Thread(new Runnable { + def run { + try { operation } catch { case e: Throwable => exception = Some(e) } + } + }) + interruptee.start + Thread.sleep(10) // give the thread a chance to make some progress before we interrupt it + interruptee.interrupt + interruptee.join + exception + } + + @Test def testThreadInterruptInRetryLoop { + val strategy = new ExponentialSleepStrategy + var iterations = 0 + var loopObject: RetryLoop = null + val exception = interruptedThread { + strategy.run( + loop => { iterations += 1; loopObject = loop }, + (exception, loop) => throw exception + ) + } + assertEquals(1, iterations) + assertEquals(1, loopObject.sleepCount) + assertEquals(classOf[InterruptedException], exception.get.getClass) + } + + @Test def testThreadInterruptInOperationSleep { + val strategy = new ExponentialSleepStrategy + var iterations = 0 + var loopObject: RetryLoop = null + val exception = interruptedThread { + strategy.run( + loop => { iterations += 1; loopObject = loop; Thread.sleep(1000) }, + (exception, loop) => throw exception + ) + } + assertEquals(1, iterations) + assertEquals(0, loopObject.sleepCount) + assertEquals(classOf[InterruptedException], exception.get.getClass) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 27b38b2..a1d2ffe 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -20,7 +20,6 @@ package org.apache.samza.checkpoint.kafka import org.I0Itec.zkclient.ZkClient - import grizzled.slf4j.Logging import kafka.admin.AdminUtils import kafka.api.FetchRequestBuilder @@ -45,6 +44,7 @@ import org.apache.samza.serializers.CheckpointSerde import org.apache.samza.serializers.Serde import org.apache.samza.system.kafka.TopicMetadataCache import org.apache.samza.util.TopicMetadataStore +import org.apache.samza.util.ExponentialSleepStrategy /** * Kafka checkpoint manager is used to store checkpoints in a Kafka topic that @@ -66,7 +66,7 @@ class KafkaCheckpointManager( metadataStore: TopicMetadataStore, connectProducer: () => Producer[Partition, Array[Byte]], connectZk: () => ZkClient, - failureRetryMs: Long = 10000, + retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, serde: Serde[Checkpoint] = new CheckpointSerde) extends CheckpointManager with Logging { var partitions = Set[Partition]() @@ -75,39 +75,31 @@ class KafkaCheckpointManager( info("Creating KafkaCheckpointManager with: clientId=%s, stateTopic=%s, systemName=%s" format (clientId, stateTopic, systemName)) def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) { - var done = false - - while (!done) { - try { + retryBackoff.run( + loop => { if (producer == null) { producer = connectProducer() } - producer.send(new KeyedMessage(stateTopic, null, partition, serde.toBytes(checkpoint))) - done = true - } catch { - case e: Throwable => - warn("Failed to send checkpoint %s for partition %s. Retrying." format (checkpoint, partition), e) - - if (producer != null) { - producer.close - } - - producer = null - - Thread.sleep(failureRetryMs) + loop.done + }, + + (exception, loop) => { + warn("Failed to send checkpoint %s for partition %s: %s. Retrying." format (checkpoint, partition, exception)) + debug(exception) + if (producer != null) { + producer.close + } + producer = null } - } + ) } def readLastCheckpoint(partition: Partition): Checkpoint = { - var checkpoint: Option[Checkpoint] = None - var consumer: SimpleConsumer = null - info("Reading checkpoint for partition %s." format partition.getPartitionId) - while (!checkpoint.isDefined) { - try { + val checkpoint = retryBackoff.run( + loop => { // Assume state topic exists with correct partitions, since it should be verified on start. // Fetch the metadata for this state topic/partition pair. val metadataMap = TopicMetadataCache.getTopicMetadata(Set(stateTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics)) @@ -123,81 +115,82 @@ class KafkaCheckpointManager( info("Connecting to leader %s:%d for topic %s and partition %s to fetch last checkpoint message." format (leader.host, leader.port, stateTopic, partitionId)) - consumer = new SimpleConsumer( + val consumer = new SimpleConsumer( leader.host, leader.port, socketTimeout, bufferSize, clientId) - val topicAndPartition = new TopicAndPartition(stateTopic, partitionId) - val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))) - .partitionErrorAndOffsets - .get(topicAndPartition) - .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:%d" format (stateTopic, partitionId))) - - // Fail or retry if there was an an issue with the offset request. - ErrorMapping.maybeThrowException(offsetResponse.error) + try { + val topicAndPartition = new TopicAndPartition(stateTopic, partitionId) + val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))) + .partitionErrorAndOffsets + .get(topicAndPartition) + .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:%d" format (stateTopic, partitionId))) - val offset = offsetResponse - .offsets - .headOption - .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:%d" format (stateTopic, partitionId))) + // Fail or retry if there was an an issue with the offset request. + ErrorMapping.maybeThrowException(offsetResponse.error) - info("Got offset %s for topic %s and partition %s. Attempting to fetch message." format (offset, stateTopic, partitionId)) + val offset = offsetResponse + .offsets + .headOption + .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:%d" format (stateTopic, partitionId))) - if (offset <= 0) { - info("Got offset 0 (no messages in state topic) for topic %s and partition %s, so returning null. If you expected the state topic to have messages, you're probably going to lose data." format (stateTopic, partition)) - return null - } + info("Got offset %s for topic %s and partition %s. Attempting to fetch message." format (offset, stateTopic, partitionId)) - val request = new FetchRequestBuilder() - // Kafka returns 1 greater than the offset of the last message in - // the topic, so subtract one to fetch the last message. - .addFetch(stateTopic, partitionId, offset - 1, fetchSize) - .maxWait(500) - .minBytes(1) - .clientId(clientId) - .build - val messageSet = consumer.fetch(request) - if (messageSet.hasError) { - warn("Got error code from broker for %s: %s" format (stateTopic, messageSet.errorCode(stateTopic, partitionId))) - val errorCode = messageSet.errorCode(stateTopic, partitionId) - if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) { - warn("Got an offset out of range exception while getting last checkpoint for topic %s and partition %s, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (stateTopic, partitionId)) + if (offset <= 0) { + info("Got offset 0 (no messages in state topic) for topic %s and partition %s, so returning null. If you expected the state topic to have messages, you're probably going to lose data." format (stateTopic, partition)) return null } - ErrorMapping.maybeThrowException(errorCode) - } - val messages = messageSet.messageSet(stateTopic, partitionId).toList - - if (messages.length != 1) { - throw new KafkaCheckpointException("Something really unexpected happened. Got %s " - + "messages back when fetching from state checkpoint topic %s and partition %s. " - + "Expected one message. It would be unsafe to go on without the latest checkpoint, " - + "so failing." format (messages.length, stateTopic, partition)) - } - - // Some back bending to go from message to checkpoint. - checkpoint = Some(serde.fromBytes(Utils.readBytes(messages(0).message.payload))) - consumer.close - } catch { - case e: KafkaCheckpointException => - throw e - case e: Throwable => - warn("Got exception while trying to read last checkpoint for topic %s and partition %s. Retrying." format (stateTopic, partition), e) + val request = new FetchRequestBuilder() + // Kafka returns 1 greater than the offset of the last message in + // the topic, so subtract one to fetch the last message. + .addFetch(stateTopic, partitionId, offset - 1, fetchSize) + .maxWait(500) + .minBytes(1) + .clientId(clientId) + .build + val messageSet = consumer.fetch(request) + if (messageSet.hasError) { + warn("Got error code from broker for %s: %s" format (stateTopic, messageSet.errorCode(stateTopic, partitionId))) + val errorCode = messageSet.errorCode(stateTopic, partitionId) + if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) { + warn("Got an offset out of range exception while getting last checkpoint for topic %s and partition %s, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (stateTopic, partitionId)) + return null + } + ErrorMapping.maybeThrowException(errorCode) + } + val messages = messageSet.messageSet(stateTopic, partitionId).toList - if (consumer != null) { - consumer.close + if (messages.length != 1) { + throw new KafkaCheckpointException("Something really unexpected happened. Got %s " + + "messages back when fetching from state checkpoint topic %s and partition %s. " + + "Expected one message. It would be unsafe to go on without the latest checkpoint, " + + "so failing." format (messages.length, stateTopic, partition)) } - Thread.sleep(failureRetryMs) + // Some back bending to go from message to checkpoint. + val checkpoint = serde.fromBytes(Utils.readBytes(messages(0).message.payload)) + loop.done + checkpoint + } finally { + consumer.close + } + }, + + (exception, loop) => { + exception match { + case e: KafkaCheckpointException => throw e + case e: Exception => + warn("While trying to read last checkpoint for topic %s and partition %s: %s. Retrying." format (stateTopic, partition, e)) + debug(e) + } } - } + ).getOrElse(throw new SamzaException("Failed to get checkpoint for partition %s" format partition.getPartitionId)) info("Got checkpoint state for partition %s: %s" format (partition.getPartitionId, checkpoint)) - - checkpoint.get + checkpoint } def start { @@ -215,76 +208,63 @@ class KafkaCheckpointManager( def stop = producer.close private def createTopic { - var done = false - var zkClient: ZkClient = null - info("Attempting to create state topic %s with %s partitions." format (stateTopic, totalPartitions)) - - while (!done) { - try { - zkClient = connectZk() - - AdminUtils.createTopic( - zkClient, - stateTopic, - totalPartitions, - replicationFactor) + retryBackoff.run( + loop => { + val zkClient = connectZk() + try { + AdminUtils.createTopic( + zkClient, + stateTopic, + totalPartitions, + replicationFactor) + } finally { + zkClient.close + } info("Created state topic %s." format stateTopic) - - done = true - } catch { - case e: TopicExistsException => - info("State topic %s already exists." format stateTopic) - - done = true - case e: Throwable => - warn("Failed to create topic %s. Retrying." format stateTopic, e) - - if (zkClient != null) { - zkClient.close - } - - Thread.sleep(failureRetryMs) + loop.done + }, + + (exception, loop) => { + exception match { + case e: TopicExistsException => + info("State topic %s already exists." format stateTopic) + loop.done + case e: Exception => + warn("Failed to create topic %s: %s. Retrying." format (stateTopic, e)) + debug(e) + } } - } - - zkClient.close + ) } private def validateTopic { - var done = false - info("Validating state topic %s." format stateTopic) - - while (!done) { - try { + retryBackoff.run( + loop => { val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(stateTopic), systemName, metadataStore.getTopicInfo) val topicMetadata = topicMetadataMap(stateTopic) - val errorCode = topicMetadata.errorCode - - if (errorCode != ErrorMapping.NoError) { - throw new SamzaException("State topic validation failed for topic %s because we got error code %s from Kafka." format (stateTopic, errorCode)) - } + ErrorMapping.maybeThrowException(topicMetadata.errorCode) val partitionCount = topicMetadata.partitionsMetadata.length - if (partitionCount != totalPartitions) { throw new KafkaCheckpointException("State topic validation failed for topic %s because partition count %s did not match expected partition count %s." format (stateTopic, topicMetadata.partitionsMetadata.length, totalPartitions)) } info("Successfully validated state topic %s." format stateTopic) - - done = true - } catch { - case e: KafkaCheckpointException => - throw e - case e: Throwable => - warn("Got exception while trying to read validate topic %s. Retrying." format stateTopic, e) - - Thread.sleep(failureRetryMs) + loop.done + }, + + (exception, loop) => { + exception match { + case e: KafkaCheckpointException => throw e + case e: Exception => + warn("While trying to validate topic %s: %s. Retrying." format (stateTopic, e)) + debug(e) + } } - } + ) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index c5ad1c8..f240d69 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -122,38 +122,39 @@ class BrokerProxy( } } - val thread: Thread = new Thread(new Runnable() { - def run() { - info("Initialising sleep strategy"); - val sleepStrategy = new ExponentialSleepStrategy - info("Starting thread for BrokerProxy") - - while (!Thread.currentThread.isInterrupted) { - if (nextOffsets.size == 0) { - debug("No TopicPartitions to fetch. Sleeping.") - Thread.sleep(sleepMSWhileNoTopicPartitions) - } else { - try { - fetchMessages() - } catch { - // If we're interrupted, don't try and reconnect. We should shut down. - case e: InterruptedException => - warn("Shutting down due to interrupt exception.") - Thread.currentThread.interrupt - case e: ClosedByInterruptException => - warn("Shutting down due to closed by interrupt exception.") - Thread.currentThread.interrupt - case e: Throwable => { - warn("Recreating simple consumer and retrying connection") - warn("Stack trace for fetchMessages exception.", e) - simpleConsumer.close() - sleepStrategy.sleep() - simpleConsumer = createSimpleConsumer() - metrics.reconnects(host, port).inc + val thread = new Thread(new Runnable { + def run { + var reconnect = false + (new ExponentialSleepStrategy).run( + loop => { + if (reconnect) { + metrics.reconnects(host, port).inc + simpleConsumer.close() + simpleConsumer = createSimpleConsumer() + } + + while (!Thread.currentThread.isInterrupted) { + if (nextOffsets.size == 0) { + debug("No TopicPartitions to fetch. Sleeping.") + Thread.sleep(sleepMSWhileNoTopicPartitions) + } else { + fetchMessages + + // If we got here, fetchMessages didn't throw an exception, i.e. it was successful. + // In that case, reset the loop delay, so that the next time an error occurs, + // we start with a short retry delay. + loop.reset } } + }, + + (exception, loop) => { + warn("Restarting consumer due to %s. Turn on debugging to get a full stack trace." format exception) + debug(exception) + reconnect = true } - } + ) + if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt") } }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID)) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index dafc980..2a23652 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -132,17 +132,9 @@ class KafkaSystemAdmin( * retry indefinitely until it gets a successful response from Kafka. */ def getSystemStreamMetadata(streams: java.util.Set[String], retryBackoff: ExponentialSleepStrategy) = { - var partitions = Map[String, Set[Partition]]() - var oldestOffsets = Map[SystemStreamPartition, String]() - var newestOffsets = Map[SystemStreamPartition, String]() - var upcomingOffsets = Map[SystemStreamPartition, String]() - var done = false - var consumer: SimpleConsumer = null - debug("Fetching system stream metadata for: %s" format streams) - - while (!done) { - try { + retryBackoff.run( + loop => { val metadata = TopicMetadataCache.getTopicMetadata( streams.toSet, systemName, @@ -151,52 +143,49 @@ class KafkaSystemAdmin( debug("Got metadata for streams: %s" format metadata) val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata) + var partitions = Map[String, Set[Partition]]() + var oldestOffsets = Map[SystemStreamPartition, String]() + var newestOffsets = Map[SystemStreamPartition, String]() + var upcomingOffsets = Map[SystemStreamPartition, String]() // Get oldest, newest, and upcoming offsets for each topic and partition. for ((broker, topicsAndPartitions) <- brokersToTopicPartitions) { debug("Fetching offsets for %s:%s: %s" format (broker.host, broker.port, topicsAndPartitions)) - consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId) - oldestOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.EarliestTime) - upcomingOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.LatestTime) - // Kafka's "latest" offset is always last message in stream's offset + - // 1, so get newest message in stream by subtracting one. this is safe - // even for key-deduplicated streams, since the last message will - // never be deduplicated. - newestOffsets = upcomingOffsets.mapValues(offset => (offset.toLong - 1).toString) - // Keep only oldest/newest offsets where there is a message. Should - // return null offsets for empty streams. - upcomingOffsets.foreach { - case (topicAndPartition, offset) => - if (offset.toLong <= 0) { - debug("Stripping oldest/newest offsets for %s because the topic appears empty." format topicAndPartition) - oldestOffsets -= topicAndPartition - newestOffsets -= topicAndPartition - } + val consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId) + try { + oldestOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.EarliestTime) + upcomingOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.LatestTime) + // Kafka's "latest" offset is always last message in stream's offset + + // 1, so get newest message in stream by subtracting one. this is safe + // even for key-deduplicated streams, since the last message will + // never be deduplicated. + newestOffsets = upcomingOffsets.mapValues(offset => (offset.toLong - 1).toString) + // Keep only oldest/newest offsets where there is a message. Should + // return null offsets for empty streams. + upcomingOffsets.foreach { + case (topicAndPartition, offset) => + if (offset.toLong <= 0) { + debug("Stripping oldest/newest offsets for %s because the topic appears empty." format topicAndPartition) + oldestOffsets -= topicAndPartition + newestOffsets -= topicAndPartition + } + } + } finally { + consumer.close } - - debug("Shutting down consumer for %s:%s." format (broker.host, broker.port)) - - consumer.close } - done = true - } catch { - case e: InterruptedException => - info("Interrupted while fetching last offsets, so forwarding.") - if (consumer != null) { - consumer.close - } - throw e - case e: Exception => - // Retry. - warn("Unable to fetch last offsets for streams due to: %s, %s. Retrying. Turn on debugging to get a full stack trace." format (e.getMessage, streams)) - debug(e) - retryBackoff.sleep - } - } + val result = assembleMetadata(oldestOffsets, newestOffsets, upcomingOffsets) + loop.done + result + }, - assembleMetadata(oldestOffsets, newestOffsets, upcomingOffsets) + (exception, loop) => { + warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception)) + debug(exception) + } + ).getOrElse(throw new SamzaException("Failed to get system stream metadata")) } /** @@ -270,4 +259,4 @@ class KafkaSystemAdmin( offsets } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index afbd7cd..8ad97df 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -33,6 +33,7 @@ import org.apache.samza.util.BlockingEnvelopeMap import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.IncomingMessageEnvelope import kafka.consumer.ConsumerConfig +import org.apache.samza.util.ExponentialSleepStrategy object KafkaSystemConsumer { def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = { @@ -56,11 +57,11 @@ private[kafka] class KafkaSystemConsumer( fetchSize:Int = ConsumerConfig.MaxFetchSize, consumerMinSize:Int = ConsumerConfig.MinFetchBytes, consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs, - brokerMetadataFailureRefreshMs: Long = 10000, fetchThreshold: Int = 0, offsetGetter: GetOffset = new GetOffset("fail"), deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]], keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]], + retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(metrics.registry, new Clock { def currentTimeMillis = clock() }, classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging { @@ -95,8 +96,8 @@ private[kafka] class KafkaSystemConsumer( def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) { var tpToRefresh = topicPartitionsAndOffsets.keySet.toList - while (!tpToRefresh.isEmpty) { - try { + retryBackoff.run( + loop => { val getTopicMetadata = (topics: Set[String]) => { new ClientUtilTopicMetadataStore(brokerListString, clientId).getTopicInfo(topics) } @@ -128,25 +129,17 @@ private[kafka] class KafkaSystemConsumer( rest } - - while(!tpToRefresh.isEmpty) { + while (!tpToRefresh.isEmpty) { tpToRefresh = refresh(tpToRefresh) } - } catch { - case e: Throwable => - warn("An exception was thrown while refreshing brokers for %s. Waiting a bit and retrying, since we can't continue without broker metadata." format tpToRefresh.head) - debug("Exception while refreshing brokers", e) - - try { - Thread.sleep(brokerMetadataFailureRefreshMs) - } catch { - case e: InterruptedException => - info("Interrupted while waiting to retry metadata refresh, so shutting down.") - - stop - } + loop.done + }, + + (loop, exception) => { + warn("While refreshing brokers for %s: %s. Retrying." format (tpToRefresh.head, exception)) + debug(exception) } - } + ) } val sink = new MessageSink { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index b09ade2..feecc58 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -26,6 +26,7 @@ import org.apache.samza.config.KafkaConfig.Config2Kafka import org.apache.samza.SamzaException import kafka.producer.Producer import org.apache.samza.system.SystemFactory +import org.apache.samza.util.ExponentialSleepStrategy class KafkaSystemFactory extends SystemFactory { def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = { @@ -69,7 +70,7 @@ class KafkaSystemFactory extends SystemFactory { val batchSize = Option(producerConfig.batchNumMessages) .getOrElse(1000) val reconnectIntervalMs = Option(producerConfig.retryBackoffMs) - .getOrElse(10000) + .getOrElse(1000) val getProducer = () => { new Producer[Object, Object](producerConfig) } val metrics = new KafkaSystemProducerMetrics(systemName, registry) @@ -80,7 +81,7 @@ class KafkaSystemFactory extends SystemFactory { new KafkaSystemProducer( systemName, batchSize, - reconnectIntervalMs, + new ExponentialSleepStrategy(initialDelayMs = reconnectIntervalMs), getProducer, metrics) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala index a419783..2de8cea 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala @@ -30,11 +30,12 @@ import org.apache.samza.config.Config import org.apache.samza.util.KafkaUtil import org.apache.samza.system.SystemProducer import org.apache.samza.system.OutgoingMessageEnvelope +import org.apache.samza.util.ExponentialSleepStrategy class KafkaSystemProducer( systemName: String, batchSize: Int, - reconnectIntervalMs: Long, + retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, getProducer: () => Producer[Object, Object], metrics: KafkaSystemProducerMetrics) extends SystemProducer with Logging { @@ -74,14 +75,11 @@ class KafkaSystemProducer( def flush(source: String) { val buffer = sourceBuffers(source) - var done = false - debug("Flushing buffer with size: %s." format buffer.size) - metrics.flushes.inc - while (!done) { - try { + retryBackoff.run( + loop => { if (producer == null) { info("Creating a new producer for system %s." format systemName) producer = getProducer() @@ -89,27 +87,21 @@ class KafkaSystemProducer( } producer.send(buffer: _*) - done = true + loop.done metrics.flushSizes.inc(buffer.size) - } catch { - case e: Throwable => - warn("Triggering a reconnect for %s because connection failed: %s" format (systemName, e.getMessage)) - debug("Exception while producing to %s." format systemName, e) - - metrics.reconnects.inc - - if (producer != null) { - producer.close - producer = null - } - - try { - Thread.sleep(reconnectIntervalMs) - } catch { - case e: InterruptedException => None - } + }, + + (exception, loop) => { + warn("Triggering a reconnect for %s because connection failed: %s" format (systemName, exception)) + debug(exception) + metrics.reconnects.inc + + if (producer != null) { + producer.close + producer = null + } } - } + ) buffer.clear debug("Flushed buffer.") http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index cd9d926..e43970c 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -299,27 +299,16 @@ class TestKafkaSystemAdmin { } } - class CallLimitReached extends Exception - - class MockSleepStrategy(maxCalls: Int) extends ExponentialSleepStrategy { - var countCalls = 0 - - override def sleep() = { - if (countCalls >= maxCalls) throw new CallLimitReached - countCalls += 1 - } - } - @Test def testShouldRetryOnTopicMetadataError { val systemAdmin = new KafkaSystemAdminWithTopicMetadataError - val retryBackoff = new MockSleepStrategy(maxCalls = 3) + val retryBackoff = new ExponentialSleepStrategy.Mock(maxCalls = 3) try { systemAdmin.getSystemStreamMetadata(Set("quux"), retryBackoff) + fail("expected CallLimitReached to be thrown") } catch { - case e: CallLimitReached => () // this would be less ugly if we were using scalatest + case e: ExponentialSleepStrategy.CallLimitReached => () case e: Throwable => throw e } - assertEquals(retryBackoff.countCalls, 3) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala index cd0942a..3684db5 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala @@ -51,13 +51,13 @@ class TestKafkaSystemProducer { val props = getProps @volatile var msgsSent = new CountDownLatch(1) - val producer = new KafkaSystemProducer("test", 1, 100, () => { + val producer = new KafkaSystemProducer(systemName = "test", batchSize = 1, getProducer = (() => { new Producer[Object, Object](new ProducerConfig(props)) { override def send(messages: KeyedMessage[Object, Object]*) { msgsSent.countDown } } - }, new KafkaSystemProducerMetrics) + }), metrics = new KafkaSystemProducerMetrics) producer.register("test") producer.start @@ -72,13 +72,13 @@ class TestKafkaSystemProducer { val props = getProps @volatile var msgsSent = 0 - val producer = new KafkaSystemProducer("test", 2, 100, () => { + val producer = new KafkaSystemProducer(systemName = "test", batchSize = 2, getProducer = (() => { new Producer[Object, Object](new ProducerConfig(props)) { override def send(messages: KeyedMessage[Object, Object]*) { msgsSent += 1 } } - }, new KafkaSystemProducerMetrics) + }), metrics = new KafkaSystemProducerMetrics) // second message should trigger the count down producer.register("test") @@ -97,13 +97,13 @@ class TestKafkaSystemProducer { val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a") val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b") - val producer = new KafkaSystemProducer("test", 3, 100, () => { + val producer = new KafkaSystemProducer(systemName = "test", batchSize = 3, getProducer = (() => { new Producer[Object, Object](new ProducerConfig(props)) { override def send(messages: KeyedMessage[Object, Object]*) { msgs ++= messages.map(_.message.asInstanceOf[String]) } } - }, new KafkaSystemProducerMetrics) + }), metrics = new KafkaSystemProducerMetrics) // flush should trigger the count down producer.register("test") @@ -128,7 +128,7 @@ class TestKafkaSystemProducer { props.put("producer.type", "sync") var failCount = 0 - val producer = new KafkaSystemProducer("test", 1, 100, () => { + val producer = new KafkaSystemProducer(systemName = "test", batchSize = 1, getProducer = (() => { failCount += 1 if (failCount <= 5) { throw new RuntimeException("Pretend to fail in factory") @@ -144,7 +144,7 @@ class TestKafkaSystemProducer { } } } - }, new KafkaSystemProducerMetrics) + }), metrics = new KafkaSystemProducerMetrics) producer.register("test") producer.start
