Repository: incubator-samza
Updated Branches:
  refs/heads/master df6e11afc -> f2fcb26a3


SAMZA-174: General-purpose implementation of a retry loop. Reviewed by Chris 
Riccomini.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/f2fcb26a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/f2fcb26a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/f2fcb26a

Branch: refs/heads/master
Commit: f2fcb26a33b59da58a852fe81564d60407aa758c
Parents: df6e11a
Author: Martin Kleppmann <[email protected]>
Authored: Fri Mar 7 16:17:38 2014 +0000
Committer: Martin Kleppmann <[email protected]>
Committed: Thu Mar 20 23:43:45 2014 +0000

----------------------------------------------------------------------
 .../samza/util/ExponentialSleepStrategy.scala   | 130 +++++++++-
 .../util/TestExponentialSleepStrategy.scala     | 161 ++++++++++--
 .../kafka/KafkaCheckpointManager.scala          | 248 +++++++++----------
 .../apache/samza/system/kafka/BrokerProxy.scala |  59 ++---
 .../samza/system/kafka/KafkaSystemAdmin.scala   |  85 +++----
 .../system/kafka/KafkaSystemConsumer.scala      |  31 +--
 .../samza/system/kafka/KafkaSystemFactory.scala |   5 +-
 .../system/kafka/KafkaSystemProducer.scala      |  42 ++--
 .../system/kafka/TestKafkaSystemAdmin.scala     |  19 +-
 .../system/kafka/TestKafkaSystemProducer.scala  |  16 +-
 10 files changed, 482 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
 
b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
index b3c9263..376b277 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
@@ -21,6 +21,19 @@
 
 package org.apache.samza.util
 
+import java.nio.channels.ClosedByInterruptException
+import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop
+
+/**
+ * Encapsulates the pattern of retrying an operation until it succeeds.
+ * Before every retry there is a delay, which starts short and gets 
exponentially
+ * longer on each retry, up to a configurable maximum. There is no limit to the
+ * number of retries.
+ *
+ * @param backOffMultiplier The factor by which the delay increases on each 
retry.
+ * @param initialDelayMs Time in milliseconds to wait after the first attempt 
failed.
+ * @param maximumDelayMs Cap up to which we will increase the delay.
+ */
 class ExponentialSleepStrategy(
     backOffMultiplier: Double = 2.0,
     initialDelayMs: Long = 100,
@@ -30,16 +43,117 @@ class ExponentialSleepStrategy(
   require(initialDelayMs > 0, "initialDelayMs must be positive")
   require(maximumDelayMs >= initialDelayMs, "maximumDelayMs must be >= 
initialDelayMs")
 
-  var previousDelay = 0L
-
-  def sleep() = {
-    val nextDelay = getNextDelay(previousDelay)
-    Thread.sleep(nextDelay)
-    previousDelay = nextDelay
-  }
-
+  /**
+   * Given the delay before the last retry, calculate what the delay before the
+   * next retry should be.
+   */
   def getNextDelay(previousDelay: Long): Long = {
     val nextDelay = (previousDelay * backOffMultiplier).asInstanceOf[Long]
     math.min(math.max(initialDelayMs, nextDelay), maximumDelayMs)
   }
+
+  /** Can be overridden by subclasses to customize looping behavior. */
+  def startLoop: RetryLoop = new RetryLoopState
+
+  /**
+   * Starts a retryable operation with the delay properties that were 
configured
+   * when the object was created. Every call to run is independent, so the same
+   * ExponentialSleepStrategy object can be used for several different retry 
loops.
+   *
+   * loopOperation is called on every attempt, and given as parameter a 
RetryLoop
+   * object. By default it is assumed that the operation failed. If the 
operation
+   * succeeded, you must call <code>done</code> on the RetryLoop object to 
indicate
+   * success. This method returns the return value of the successful 
loopOperation.
+   *
+   * If an exception is thrown during the execution of loopOperation, the 
onException
+   * handler is called. You can choose to re-throw the exception (so that it 
aborts
+   * the run loop and bubbles up), or ignore it (the operation will be 
retried),
+   * or call <code>done</code> (give up, don't retry).
+   *
+   * @param loopOperation The operation that should be attempted and may fail.
+   * @param onException Handler function that determines what to do with an 
exception.
+   * @return If loopOperation succeeded, an option containing the return value 
of
+   *         the last invocation. If done was called in the exception hander, 
None.
+   */
+  def run[A](loopOperation: RetryLoop => A, onException: (Exception, 
RetryLoop) => Unit): Option[A] = {
+    val loop = startLoop
+    while (!loop.isDone && !Thread.currentThread.isInterrupted) {
+      try {
+        val result = loopOperation(loop)
+        if (loop.isDone) return Some(result)
+      } catch {
+        case e: InterruptedException       => throw e
+        case e: ClosedByInterruptException => throw e
+        case e: Exception                  => onException(e, loop)
+      }
+      if (!loop.isDone && !Thread.currentThread.isInterrupted) loop.sleep
+    }
+    None
+  }
+
+  private[util] class RetryLoopState extends RetryLoop {
+    var previousDelay = 0L
+    var isDone = false
+    var sleepCount = 0
+
+    def sleep {
+      sleepCount += 1
+      val nextDelay = getNextDelay(previousDelay)
+      previousDelay = nextDelay
+      Thread.sleep(nextDelay)
+    }
+
+    def reset {
+      previousDelay = 0
+      isDone = false
+    }
+
+    def done {
+      isDone = true
+    }
+  }
+}
+
+object ExponentialSleepStrategy {
+  /**
+   * State of the retry loop, passed to every invocation of the loopOperation
+   * or the exception handler.
+   */
+  trait RetryLoop {
+    /** Let the current thread sleep for the backoff time (called by run 
method). */
+    def sleep
+
+    /** Tell the retry loop to revert to initialDelayMs for the next retry. */
+    def reset
+
+    /** Tell the retry loop to stop trying (success or giving up). */
+    def done
+
+    /** Check whether <code>done</code> was called (used by the run method). */
+    def isDone: Boolean
+
+    /** Returns the number of times that the retry loop has called 
<code>sleep</code>. */
+    def sleepCount: Int
+  }
+
+  /** For tests using ExponentialSleepStrategy.Mock */
+  class CallLimitReached extends Exception
+
+  /**
+   * For writing tests of retryable code. Doesn't actually sleep, so that tests
+   * are quick to run.
+   *
+   * @param maxCalls The maximum number of retries to allow before throwing 
CallLimitReached.
+   */
+  class Mock(maxCalls: Int) extends ExponentialSleepStrategy {
+    override def startLoop = new MockRetryLoop
+
+    class MockRetryLoop extends RetryLoop {
+      var isDone = false
+      var sleepCount = 0
+      def sleep { sleepCount += 1; if (sleepCount > maxCalls) throw new 
CallLimitReached }
+      def reset { isDone = false }
+      def done  { isDone = true  }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
 
b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
index 3036da9..6cea6a2 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
@@ -23,34 +23,143 @@ package org.apache.samza.util
 
 import org.junit.Assert._
 import org.junit.Test
+import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop
 
 class TestExponentialSleepStrategy {
 
-  @Test def testGetNextDelayReturnsIncrementalDelay() = {
-    val st = new ExponentialSleepStrategy
-    var nextDelay = st.getNextDelay(0L)
-    assertEquals(nextDelay, 100L)
-    nextDelay = st.getNextDelay(nextDelay)
-    assertEquals(nextDelay, 200L)
-    nextDelay = st.getNextDelay(nextDelay)
-    assertEquals(nextDelay, 400L)
-  }
-
-  @Test def testGetNextDelayReturnsMaximumDelayWhenDelayCapReached() = {
-    val st = new ExponentialSleepStrategy
-    var nextDelay = st.getNextDelay(6400L)
-    assertEquals(nextDelay, 10000L)
-    nextDelay = st.getNextDelay(nextDelay)
-    assertEquals(nextDelay, 10000L)
-  }
-
-  @Test def testSleepStrategyIsConfigurable() = {
-    val st = new ExponentialSleepStrategy(backOffMultiplier = 3.0, 
initialDelayMs = 10)
-    var nextDelay = st.getNextDelay(0L)
-    assertEquals(nextDelay, 10L)
-    nextDelay = st.getNextDelay(nextDelay)
-    assertEquals(nextDelay, 30L)
-    nextDelay = st.getNextDelay(nextDelay)
-    assertEquals(nextDelay, 90L)
+  @Test def testGetNextDelayReturnsIncrementalDelay {
+    val strategy = new ExponentialSleepStrategy
+    assertEquals(100, strategy.getNextDelay(0))
+    assertEquals(200, strategy.getNextDelay(100))
+    assertEquals(400, strategy.getNextDelay(200))
+    assertEquals(800, strategy.getNextDelay(400))
+  }
+
+  @Test def testGetNextDelayReturnsMaximumDelayWhenDelayCapReached {
+    val strategy = new ExponentialSleepStrategy
+    assertEquals(10000, strategy.getNextDelay(6400))
+    assertEquals(10000, strategy.getNextDelay(10000))
+  }
+
+  @Test def testSleepStrategyIsConfigurable {
+    val strategy = new ExponentialSleepStrategy(backOffMultiplier = 3.0, 
initialDelayMs = 10)
+    assertEquals(10, strategy.getNextDelay(0))
+    assertEquals(30, strategy.getNextDelay(10))
+    assertEquals(90, strategy.getNextDelay(30))
+    assertEquals(270, strategy.getNextDelay(90))
+  }
+
+  @Test def testResetToInitialDelay {
+    val strategy = new ExponentialSleepStrategy
+    val loop = 
strategy.startLoop.asInstanceOf[ExponentialSleepStrategy#RetryLoopState]
+    loop.previousDelay = strategy.getNextDelay(loop.previousDelay)
+    assertEquals(100, loop.previousDelay)
+    loop.previousDelay = strategy.getNextDelay(loop.previousDelay)
+    loop.previousDelay = strategy.getNextDelay(loop.previousDelay)
+    assertEquals(400, loop.previousDelay)
+    loop.reset
+    loop.previousDelay = strategy.getNextDelay(loop.previousDelay)
+    assertEquals(100, loop.previousDelay)
+  }
+
+  @Test def testRetryWithoutException {
+    val strategy = new ExponentialSleepStrategy(initialDelayMs = 1)
+    var iterations = 0
+    var loopObject: RetryLoop = null
+    val result = strategy.run(
+      loop => {
+        loopObject = loop
+        iterations += 1
+        if (iterations == 3) loop.done
+        iterations
+      },
+      (exception, loop) => throw exception
+    )
+    assertEquals(Some(3), result)
+    assertEquals(3, iterations)
+    assertEquals(2, loopObject.sleepCount)
+  }
+
+  @Test def testRetryWithException {
+    val strategy = new ExponentialSleepStrategy(initialDelayMs = 1)
+    var iterations = 0
+    var loopObject: RetryLoop = null
+    strategy.run(
+      loop => { throw new IllegalArgumentException("boom") },
+      (exception, loop) => {
+        assertEquals("boom", exception.getMessage)
+        loopObject = loop
+        iterations += 1
+        if (iterations == 3) loop.done
+      }
+    )
+    assertEquals(3, iterations)
+    assertEquals(2, loopObject.sleepCount)
+  }
+
+  @Test def testReThrowingException {
+    val strategy = new ExponentialSleepStrategy(initialDelayMs = 1)
+    var iterations = 0
+    var loopObject: RetryLoop = null
+    try {
+      strategy.run(
+        loop => {
+          loopObject = loop
+          iterations += 1
+          throw new IllegalArgumentException("boom")
+        },
+        (exception, loop) => throw exception
+      )
+      fail("expected exception to be thrown")
+    } catch {
+      case e: IllegalArgumentException => assertEquals("boom", e.getMessage)
+      case e: Throwable => throw e
+    }
+    assertEquals(1, iterations)
+    assertEquals(0, loopObject.sleepCount)
+  }
+
+  def interruptedThread(operation: => Unit) = {
+    var exception: Option[Throwable] = None
+    val interruptee = new Thread(new Runnable {
+      def run {
+        try { operation } catch { case e: Throwable => exception = Some(e) }
+      }
+    })
+    interruptee.start
+    Thread.sleep(10) // give the thread a chance to make some progress before 
we interrupt it
+    interruptee.interrupt
+    interruptee.join
+    exception
+  }
+
+  @Test def testThreadInterruptInRetryLoop {
+    val strategy = new ExponentialSleepStrategy
+    var iterations = 0
+    var loopObject: RetryLoop = null
+    val exception = interruptedThread {
+      strategy.run(
+        loop => { iterations += 1; loopObject = loop },
+        (exception, loop) => throw exception
+      )
+    }
+    assertEquals(1, iterations)
+    assertEquals(1, loopObject.sleepCount)
+    assertEquals(classOf[InterruptedException], exception.get.getClass)
+  }
+
+  @Test def testThreadInterruptInOperationSleep {
+    val strategy = new ExponentialSleepStrategy
+    var iterations = 0
+    var loopObject: RetryLoop = null
+    val exception = interruptedThread {
+      strategy.run(
+        loop => { iterations += 1; loopObject = loop; Thread.sleep(1000) },
+        (exception, loop) => throw exception
+      )
+    }
+    assertEquals(1, iterations)
+    assertEquals(0, loopObject.sleepCount)
+    assertEquals(classOf[InterruptedException], exception.get.getClass)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 27b38b2..a1d2ffe 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -20,7 +20,6 @@
 package org.apache.samza.checkpoint.kafka
 
 import org.I0Itec.zkclient.ZkClient
-
 import grizzled.slf4j.Logging
 import kafka.admin.AdminUtils
 import kafka.api.FetchRequestBuilder
@@ -45,6 +44,7 @@ import org.apache.samza.serializers.CheckpointSerde
 import org.apache.samza.serializers.Serde
 import org.apache.samza.system.kafka.TopicMetadataCache
 import org.apache.samza.util.TopicMetadataStore
+import org.apache.samza.util.ExponentialSleepStrategy
 
 /**
  * Kafka checkpoint manager is used to store checkpoints in a Kafka topic that
@@ -66,7 +66,7 @@ class KafkaCheckpointManager(
   metadataStore: TopicMetadataStore,
   connectProducer: () => Producer[Partition, Array[Byte]],
   connectZk: () => ZkClient,
-  failureRetryMs: Long = 10000,
+  retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
   serde: Serde[Checkpoint] = new CheckpointSerde) extends CheckpointManager 
with Logging {
 
   var partitions = Set[Partition]()
@@ -75,39 +75,31 @@ class KafkaCheckpointManager(
   info("Creating KafkaCheckpointManager with: clientId=%s, stateTopic=%s, 
systemName=%s" format (clientId, stateTopic, systemName))
 
   def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) {
-    var done = false
-
-    while (!done) {
-      try {
+    retryBackoff.run(
+      loop => {
         if (producer == null) {
           producer = connectProducer()
         }
-
         producer.send(new KeyedMessage(stateTopic, null, partition, 
serde.toBytes(checkpoint)))
-        done = true
-      } catch {
-        case e: Throwable =>
-          warn("Failed to send checkpoint %s for partition %s. Retrying." 
format (checkpoint, partition), e)
-
-          if (producer != null) {
-            producer.close
-          }
-
-          producer = null
-
-          Thread.sleep(failureRetryMs)
+        loop.done
+      },
+
+      (exception, loop) => {
+        warn("Failed to send checkpoint %s for partition %s: %s. Retrying." 
format (checkpoint, partition, exception))
+        debug(exception)
+        if (producer != null) {
+          producer.close
+        }
+        producer = null
       }
-    }
+    )
   }
 
   def readLastCheckpoint(partition: Partition): Checkpoint = {
-    var checkpoint: Option[Checkpoint] = None
-    var consumer: SimpleConsumer = null
-
     info("Reading checkpoint for partition %s." format 
partition.getPartitionId)
 
-    while (!checkpoint.isDefined) {
-      try {
+    val checkpoint = retryBackoff.run(
+      loop => {
         // Assume state topic exists with correct partitions, since it should 
be verified on start.
         // Fetch the metadata for this state topic/partition pair.
         val metadataMap = TopicMetadataCache.getTopicMetadata(Set(stateTopic), 
systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
@@ -123,81 +115,82 @@ class KafkaCheckpointManager(
 
         info("Connecting to leader %s:%d for topic %s and partition %s to 
fetch last checkpoint message." format (leader.host, leader.port, stateTopic, 
partitionId))
 
-        consumer = new SimpleConsumer(
+        val consumer = new SimpleConsumer(
           leader.host,
           leader.port,
           socketTimeout,
           bufferSize,
           clientId)
-        val topicAndPartition = new TopicAndPartition(stateTopic, partitionId)
-        val offsetResponse = consumer.getOffsetsBefore(new 
OffsetRequest(Map(topicAndPartition -> 
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
-          .partitionErrorAndOffsets
-          .get(topicAndPartition)
-          .getOrElse(throw new KafkaCheckpointException("Unable to find offset 
information for %s:%d" format (stateTopic, partitionId)))
-
-        // Fail or retry if there was an an issue with the offset request.
-        ErrorMapping.maybeThrowException(offsetResponse.error)
+        try {
+          val topicAndPartition = new TopicAndPartition(stateTopic, 
partitionId)
+          val offsetResponse = consumer.getOffsetsBefore(new 
OffsetRequest(Map(topicAndPartition -> 
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
+            .partitionErrorAndOffsets
+            .get(topicAndPartition)
+            .getOrElse(throw new KafkaCheckpointException("Unable to find 
offset information for %s:%d" format (stateTopic, partitionId)))
 
-        val offset = offsetResponse
-          .offsets
-          .headOption
-          .getOrElse(throw new KafkaCheckpointException("Got response, but no 
offsets defined for %s:%d" format (stateTopic, partitionId)))
+          // Fail or retry if there was an an issue with the offset request.
+          ErrorMapping.maybeThrowException(offsetResponse.error)
 
-        info("Got offset %s for topic %s and partition %s. Attempting to fetch 
message." format (offset, stateTopic, partitionId))
+          val offset = offsetResponse
+            .offsets
+            .headOption
+            .getOrElse(throw new KafkaCheckpointException("Got response, but 
no offsets defined for %s:%d" format (stateTopic, partitionId)))
 
-        if (offset <= 0) {
-          info("Got offset 0 (no messages in state topic) for topic %s and 
partition %s, so returning null. If you expected the state topic to have 
messages, you're probably going to lose data." format (stateTopic, partition))
-          return null
-        }
+          info("Got offset %s for topic %s and partition %s. Attempting to 
fetch message." format (offset, stateTopic, partitionId))
 
-        val request = new FetchRequestBuilder()
-          // Kafka returns 1 greater than the offset of the last message in 
-          // the topic, so subtract one to fetch the last message.
-          .addFetch(stateTopic, partitionId, offset - 1, fetchSize)
-          .maxWait(500)
-          .minBytes(1)
-          .clientId(clientId)
-          .build
-        val messageSet = consumer.fetch(request)
-        if (messageSet.hasError) {
-          warn("Got error code from broker for %s: %s" format (stateTopic, 
messageSet.errorCode(stateTopic, partitionId)))
-          val errorCode = messageSet.errorCode(stateTopic, partitionId)
-          if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
-            warn("Got an offset out of range exception while getting last 
checkpoint for topic %s and partition %s, so returning a null offset to the 
KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." 
format (stateTopic, partitionId))
+          if (offset <= 0) {
+            info("Got offset 0 (no messages in state topic) for topic %s and 
partition %s, so returning null. If you expected the state topic to have 
messages, you're probably going to lose data." format (stateTopic, partition))
             return null
           }
-          ErrorMapping.maybeThrowException(errorCode)
-        }
-        val messages = messageSet.messageSet(stateTopic, partitionId).toList
-
-        if (messages.length != 1) {
-          throw new KafkaCheckpointException("Something really unexpected 
happened. Got %s "
-            + "messages back when fetching from state checkpoint topic %s and 
partition %s. "
-            + "Expected one message. It would be unsafe to go on without the 
latest checkpoint, "
-            + "so failing." format (messages.length, stateTopic, partition))
-        }
-
-        // Some back bending to go from message to checkpoint.
-        checkpoint = 
Some(serde.fromBytes(Utils.readBytes(messages(0).message.payload)))
 
-        consumer.close
-      } catch {
-        case e: KafkaCheckpointException =>
-          throw e
-        case e: Throwable =>
-          warn("Got exception while trying to read last checkpoint for topic 
%s and partition %s. Retrying." format (stateTopic, partition), e)
+          val request = new FetchRequestBuilder()
+            // Kafka returns 1 greater than the offset of the last message in
+            // the topic, so subtract one to fetch the last message.
+            .addFetch(stateTopic, partitionId, offset - 1, fetchSize)
+            .maxWait(500)
+            .minBytes(1)
+            .clientId(clientId)
+            .build
+          val messageSet = consumer.fetch(request)
+          if (messageSet.hasError) {
+            warn("Got error code from broker for %s: %s" format (stateTopic, 
messageSet.errorCode(stateTopic, partitionId)))
+            val errorCode = messageSet.errorCode(stateTopic, partitionId)
+            if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
+              warn("Got an offset out of range exception while getting last 
checkpoint for topic %s and partition %s, so returning a null offset to the 
KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." 
format (stateTopic, partitionId))
+              return null
+            }
+            ErrorMapping.maybeThrowException(errorCode)
+          }
+          val messages = messageSet.messageSet(stateTopic, partitionId).toList
 
-          if (consumer != null) {
-            consumer.close
+          if (messages.length != 1) {
+            throw new KafkaCheckpointException("Something really unexpected 
happened. Got %s "
+              + "messages back when fetching from state checkpoint topic %s 
and partition %s. "
+              + "Expected one message. It would be unsafe to go on without the 
latest checkpoint, "
+              + "so failing." format (messages.length, stateTopic, partition))
           }
 
-          Thread.sleep(failureRetryMs)
+          // Some back bending to go from message to checkpoint.
+          val checkpoint = 
serde.fromBytes(Utils.readBytes(messages(0).message.payload))
+          loop.done
+          checkpoint
+        } finally {
+          consumer.close
+        }
+      },
+
+      (exception, loop) => {
+        exception match {
+          case e: KafkaCheckpointException => throw e
+          case e: Exception =>
+            warn("While trying to read last checkpoint for topic %s and 
partition %s: %s. Retrying." format (stateTopic, partition, e))
+            debug(e)
+        }
       }
-    }
+    ).getOrElse(throw new SamzaException("Failed to get checkpoint for 
partition %s" format partition.getPartitionId))
 
     info("Got checkpoint state for partition %s: %s" format 
(partition.getPartitionId, checkpoint))
-
-    checkpoint.get
+    checkpoint
   }
 
   def start {
@@ -215,76 +208,63 @@ class KafkaCheckpointManager(
   def stop = producer.close
 
   private def createTopic {
-    var done = false
-    var zkClient: ZkClient = null
-
     info("Attempting to create state topic %s with %s partitions." format 
(stateTopic, totalPartitions))
-
-    while (!done) {
-      try {
-        zkClient = connectZk()
-
-        AdminUtils.createTopic(
-          zkClient,
-          stateTopic,
-          totalPartitions,
-          replicationFactor)
+    retryBackoff.run(
+      loop => {
+        val zkClient = connectZk()
+        try {
+          AdminUtils.createTopic(
+            zkClient,
+            stateTopic,
+            totalPartitions,
+            replicationFactor)
+        } finally {
+          zkClient.close
+        }
 
         info("Created state topic %s." format stateTopic)
-
-        done = true
-      } catch {
-        case e: TopicExistsException =>
-          info("State topic %s already exists." format stateTopic)
-
-          done = true
-        case e: Throwable =>
-          warn("Failed to create topic %s. Retrying." format stateTopic, e)
-
-          if (zkClient != null) {
-            zkClient.close
-          }
-
-          Thread.sleep(failureRetryMs)
+        loop.done
+      },
+
+      (exception, loop) => {
+        exception match {
+          case e: TopicExistsException =>
+            info("State topic %s already exists." format stateTopic)
+            loop.done
+          case e: Exception =>
+            warn("Failed to create topic %s: %s. Retrying." format 
(stateTopic, e))
+            debug(e)
+        }
       }
-    }
-
-    zkClient.close
+    )
   }
 
   private def validateTopic {
-    var done = false
-
     info("Validating state topic %s." format stateTopic)
-
-    while (!done) {
-      try {
+    retryBackoff.run(
+      loop => {
         val topicMetadataMap = 
TopicMetadataCache.getTopicMetadata(Set(stateTopic), systemName, 
metadataStore.getTopicInfo)
         val topicMetadata = topicMetadataMap(stateTopic)
-        val errorCode = topicMetadata.errorCode
-
-        if (errorCode != ErrorMapping.NoError) {
-          throw new SamzaException("State topic validation failed for topic %s 
because we got error code %s from Kafka." format (stateTopic, errorCode))
-        }
+        ErrorMapping.maybeThrowException(topicMetadata.errorCode)
 
         val partitionCount = topicMetadata.partitionsMetadata.length
-
         if (partitionCount != totalPartitions) {
           throw new KafkaCheckpointException("State topic validation failed 
for topic %s because partition count %s did not match expected partition count 
%s." format (stateTopic, topicMetadata.partitionsMetadata.length, 
totalPartitions))
         }
 
         info("Successfully validated state topic %s." format stateTopic)
-
-        done = true
-      } catch {
-        case e: KafkaCheckpointException =>
-          throw e
-        case e: Throwable =>
-          warn("Got exception while trying to read validate topic %s. 
Retrying." format stateTopic, e)
-
-          Thread.sleep(failureRetryMs)
+        loop.done
+      },
+
+      (exception, loop) => {
+        exception match {
+          case e: KafkaCheckpointException => throw e
+          case e: Exception =>
+            warn("While trying to validate topic %s: %s. Retrying." format 
(stateTopic, e))
+            debug(e)
+        }
       }
-    }
+    )
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index c5ad1c8..f240d69 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -122,38 +122,39 @@ class BrokerProxy(
     }
   }
 
-  val thread: Thread = new Thread(new Runnable() {
-    def run() {
-      info("Initialising sleep strategy");
-      val sleepStrategy = new ExponentialSleepStrategy
-      info("Starting thread for BrokerProxy")
-
-      while (!Thread.currentThread.isInterrupted) {
-        if (nextOffsets.size == 0) {
-          debug("No TopicPartitions to fetch. Sleeping.")
-          Thread.sleep(sleepMSWhileNoTopicPartitions)
-        } else {
-          try {
-            fetchMessages()
-          } catch {
-            // If we're interrupted, don't try and reconnect. We should shut 
down.
-            case e: InterruptedException =>
-              warn("Shutting down due to interrupt exception.")
-              Thread.currentThread.interrupt
-            case e: ClosedByInterruptException =>
-              warn("Shutting down due to closed by interrupt exception.")
-              Thread.currentThread.interrupt
-            case e: Throwable => {
-              warn("Recreating simple consumer and retrying connection")
-              warn("Stack trace for fetchMessages exception.", e)
-              simpleConsumer.close()
-              sleepStrategy.sleep()
-              simpleConsumer = createSimpleConsumer()
-              metrics.reconnects(host, port).inc
+  val thread = new Thread(new Runnable {
+    def run {
+      var reconnect = false
+      (new ExponentialSleepStrategy).run(
+        loop => {
+          if (reconnect) {
+            metrics.reconnects(host, port).inc
+            simpleConsumer.close()
+            simpleConsumer = createSimpleConsumer()
+          }
+
+          while (!Thread.currentThread.isInterrupted) {
+            if (nextOffsets.size == 0) {
+              debug("No TopicPartitions to fetch. Sleeping.")
+              Thread.sleep(sleepMSWhileNoTopicPartitions)
+            } else {
+              fetchMessages
+
+              // If we got here, fetchMessages didn't throw an exception, i.e. 
it was successful.
+              // In that case, reset the loop delay, so that the next time an 
error occurs,
+              // we start with a short retry delay.
+              loop.reset
             }
           }
+        },
+
+        (exception, loop) => {
+          warn("Restarting consumer due to %s. Turn on debugging to get a full 
stack trace." format exception)
+          debug(exception)
+          reconnect = true
         }
-      }
+      )
+      if (Thread.currentThread.isInterrupted) info("Shutting down due to 
interrupt")
     }
   }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, 
clientID))
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index dafc980..2a23652 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -132,17 +132,9 @@ class KafkaSystemAdmin(
    * retry indefinitely until it gets a successful response from Kafka.
    */
   def getSystemStreamMetadata(streams: java.util.Set[String], retryBackoff: 
ExponentialSleepStrategy) = {
-    var partitions = Map[String, Set[Partition]]()
-    var oldestOffsets = Map[SystemStreamPartition, String]()
-    var newestOffsets = Map[SystemStreamPartition, String]()
-    var upcomingOffsets = Map[SystemStreamPartition, String]()
-    var done = false
-    var consumer: SimpleConsumer = null
-
     debug("Fetching system stream metadata for: %s" format streams)
-
-    while (!done) {
-      try {
+    retryBackoff.run(
+      loop => {
         val metadata = TopicMetadataCache.getTopicMetadata(
           streams.toSet,
           systemName,
@@ -151,52 +143,49 @@ class KafkaSystemAdmin(
         debug("Got metadata for streams: %s" format metadata)
 
         val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata)
+        var partitions = Map[String, Set[Partition]]()
+        var oldestOffsets = Map[SystemStreamPartition, String]()
+        var newestOffsets = Map[SystemStreamPartition, String]()
+        var upcomingOffsets = Map[SystemStreamPartition, String]()
 
         // Get oldest, newest, and upcoming offsets for each topic and 
partition.
         for ((broker, topicsAndPartitions) <- brokersToTopicPartitions) {
           debug("Fetching offsets for %s:%s: %s" format (broker.host, 
broker.port, topicsAndPartitions))
 
-          consumer = new SimpleConsumer(broker.host, broker.port, timeout, 
bufferSize, clientId)
-          oldestOffsets ++= getOffsets(consumer, topicsAndPartitions, 
OffsetRequest.EarliestTime)
-          upcomingOffsets ++= getOffsets(consumer, topicsAndPartitions, 
OffsetRequest.LatestTime)
-          // Kafka's "latest" offset is always last message in stream's offset 
+ 
-          // 1, so get newest message in stream by subtracting one. this is 
safe 
-          // even for key-deduplicated streams, since the last message will 
-          // never be deduplicated.
-          newestOffsets = upcomingOffsets.mapValues(offset => (offset.toLong - 
1).toString)
-          // Keep only oldest/newest offsets where there is a message. Should 
-          // return null offsets for empty streams.
-          upcomingOffsets.foreach {
-            case (topicAndPartition, offset) =>
-              if (offset.toLong <= 0) {
-                debug("Stripping oldest/newest offsets for %s because the 
topic appears empty." format topicAndPartition)
-                oldestOffsets -= topicAndPartition
-                newestOffsets -= topicAndPartition
-              }
+          val consumer = new SimpleConsumer(broker.host, broker.port, timeout, 
bufferSize, clientId)
+          try {
+            oldestOffsets ++= getOffsets(consumer, topicsAndPartitions, 
OffsetRequest.EarliestTime)
+            upcomingOffsets ++= getOffsets(consumer, topicsAndPartitions, 
OffsetRequest.LatestTime)
+            // Kafka's "latest" offset is always last message in stream's 
offset +
+            // 1, so get newest message in stream by subtracting one. this is 
safe
+            // even for key-deduplicated streams, since the last message will
+            // never be deduplicated.
+            newestOffsets = upcomingOffsets.mapValues(offset => (offset.toLong 
- 1).toString)
+            // Keep only oldest/newest offsets where there is a message. Should
+            // return null offsets for empty streams.
+            upcomingOffsets.foreach {
+              case (topicAndPartition, offset) =>
+                if (offset.toLong <= 0) {
+                  debug("Stripping oldest/newest offsets for %s because the 
topic appears empty." format topicAndPartition)
+                  oldestOffsets -= topicAndPartition
+                  newestOffsets -= topicAndPartition
+                }
+            }
+          } finally {
+            consumer.close
           }
-
-          debug("Shutting down consumer for %s:%s." format (broker.host, 
broker.port))
-
-          consumer.close
         }
 
-        done = true
-      } catch {
-        case e: InterruptedException =>
-          info("Interrupted while fetching last offsets, so forwarding.")
-          if (consumer != null) {
-            consumer.close
-          }
-          throw e
-        case e: Exception =>
-          // Retry.
-          warn("Unable to fetch last offsets for streams due to: %s, %s. 
Retrying. Turn on debugging to get a full stack trace." format (e.getMessage, 
streams))
-          debug(e)
-          retryBackoff.sleep
-      }
-    }
+        val result = assembleMetadata(oldestOffsets, newestOffsets, 
upcomingOffsets)
+        loop.done
+        result
+      },
 
-    assembleMetadata(oldestOffsets, newestOffsets, upcomingOffsets)
+      (exception, loop) => {
+        warn("Unable to fetch last offsets for streams %s due to %s. 
Retrying." format (streams, exception))
+        debug(exception)
+      }
+    ).getOrElse(throw new SamzaException("Failed to get system stream 
metadata"))
   }
 
   /**
@@ -270,4 +259,4 @@ class KafkaSystemAdmin(
 
     offsets
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index afbd7cd..8ad97df 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -33,6 +33,7 @@ import org.apache.samza.util.BlockingEnvelopeMap
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.IncomingMessageEnvelope
 import kafka.consumer.ConsumerConfig
+import org.apache.samza.util.ExponentialSleepStrategy
 
 object KafkaSystemConsumer {
   def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
@@ -56,11 +57,11 @@ private[kafka] class KafkaSystemConsumer(
   fetchSize:Int = ConsumerConfig.MaxFetchSize,
   consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
   consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
-  brokerMetadataFailureRefreshMs: Long = 10000,
   fetchThreshold: Int = 0,
   offsetGetter: GetOffset = new GetOffset("fail"),
   deserializer: Decoder[Object] = new 
DefaultDecoder().asInstanceOf[Decoder[Object]],
   keyDeserializer: Decoder[Object] = new 
DefaultDecoder().asInstanceOf[Decoder[Object]],
+  retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
   clock: () => Long = { System.currentTimeMillis }) extends 
BlockingEnvelopeMap(metrics.registry, new Clock {
   def currentTimeMillis = clock()
 }, classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging {
@@ -95,8 +96,8 @@ private[kafka] class KafkaSystemConsumer(
 
   def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, 
String]) {
     var tpToRefresh = topicPartitionsAndOffsets.keySet.toList
-    while (!tpToRefresh.isEmpty) {
-      try {
+    retryBackoff.run(
+      loop => {
         val getTopicMetadata = (topics: Set[String]) => {
           new ClientUtilTopicMetadataStore(brokerListString, 
clientId).getTopicInfo(topics)
         }
@@ -128,25 +129,17 @@ private[kafka] class KafkaSystemConsumer(
           rest
         }
 
-
-        while(!tpToRefresh.isEmpty) {
+        while (!tpToRefresh.isEmpty) {
           tpToRefresh = refresh(tpToRefresh)
         }
-      } catch {
-        case e: Throwable =>
-          warn("An exception was thrown while refreshing brokers for %s. 
Waiting a bit and retrying, since we can't continue without broker metadata." 
format tpToRefresh.head)
-          debug("Exception while refreshing brokers", e)
-
-          try {
-            Thread.sleep(brokerMetadataFailureRefreshMs)
-          } catch {
-            case e: InterruptedException =>
-              info("Interrupted while waiting to retry metadata refresh, so 
shutting down.")
-
-              stop
-          }
+        loop.done
+      },
+
+      (loop, exception) => {
+        warn("While refreshing brokers for %s: %s. Retrying." format 
(tpToRefresh.head, exception))
+        debug(exception)
       }
-    }
+    )
   }
 
   val sink = new MessageSink {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index b09ade2..feecc58 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -26,6 +26,7 @@ import org.apache.samza.config.KafkaConfig.Config2Kafka
 import org.apache.samza.SamzaException
 import kafka.producer.Producer
 import org.apache.samza.system.SystemFactory
+import org.apache.samza.util.ExponentialSleepStrategy
 
 class KafkaSystemFactory extends SystemFactory {
   def getConsumer(systemName: String, config: Config, registry: 
MetricsRegistry) = {
@@ -69,7 +70,7 @@ class KafkaSystemFactory extends SystemFactory {
     val batchSize = Option(producerConfig.batchNumMessages)
       .getOrElse(1000)
     val reconnectIntervalMs = Option(producerConfig.retryBackoffMs)
-      .getOrElse(10000)
+      .getOrElse(1000)
     val getProducer = () => { new Producer[Object, Object](producerConfig) }
     val metrics = new KafkaSystemProducerMetrics(systemName, registry)
 
@@ -80,7 +81,7 @@ class KafkaSystemFactory extends SystemFactory {
     new KafkaSystemProducer(
       systemName,
       batchSize,
-      reconnectIntervalMs,
+      new ExponentialSleepStrategy(initialDelayMs = reconnectIntervalMs),
       getProducer,
       metrics)
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index a419783..2de8cea 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -30,11 +30,12 @@ import org.apache.samza.config.Config
 import org.apache.samza.util.KafkaUtil
 import org.apache.samza.system.SystemProducer
 import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.samza.util.ExponentialSleepStrategy
 
 class KafkaSystemProducer(
   systemName: String,
   batchSize: Int,
-  reconnectIntervalMs: Long,
+  retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
   getProducer: () => Producer[Object, Object],
   metrics: KafkaSystemProducerMetrics) extends SystemProducer with Logging {
 
@@ -74,14 +75,11 @@ class KafkaSystemProducer(
 
   def flush(source: String) {
     val buffer = sourceBuffers(source)
-    var done = false
-
     debug("Flushing buffer with size: %s." format buffer.size)
-
     metrics.flushes.inc
 
-    while (!done) {
-      try {
+    retryBackoff.run(
+      loop => {
         if (producer == null) {
           info("Creating a new producer for system %s." format systemName)
           producer = getProducer()
@@ -89,27 +87,21 @@ class KafkaSystemProducer(
         }
 
         producer.send(buffer: _*)
-        done = true
+        loop.done
         metrics.flushSizes.inc(buffer.size)
-      } catch {
-        case e: Throwable =>
-          warn("Triggering a reconnect for %s because connection failed: %s" 
format (systemName, e.getMessage))
-          debug("Exception while producing to %s." format systemName, e)
-
-          metrics.reconnects.inc
-
-          if (producer != null) {
-            producer.close
-            producer = null
-          }
-
-          try {
-            Thread.sleep(reconnectIntervalMs)
-          } catch {
-            case e: InterruptedException => None
-          }
+      },
+
+      (exception, loop) => {
+        warn("Triggering a reconnect for %s because connection failed: %s" 
format (systemName, exception))
+        debug(exception)
+        metrics.reconnects.inc
+
+        if (producer != null) {
+          producer.close
+          producer = null
+        }
       }
-    }
+    )
 
     buffer.clear
     debug("Flushed buffer.")

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index cd9d926..e43970c 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -299,27 +299,16 @@ class TestKafkaSystemAdmin {
     }
   }
 
-  class CallLimitReached extends Exception
-
-  class MockSleepStrategy(maxCalls: Int) extends ExponentialSleepStrategy {
-    var countCalls = 0
-
-    override def sleep() = {
-      if (countCalls >= maxCalls) throw new CallLimitReached
-      countCalls += 1
-    }
-  }
-
   @Test
   def testShouldRetryOnTopicMetadataError {
     val systemAdmin = new KafkaSystemAdminWithTopicMetadataError
-    val retryBackoff = new MockSleepStrategy(maxCalls = 3)
+    val retryBackoff = new ExponentialSleepStrategy.Mock(maxCalls = 3)
     try {
       systemAdmin.getSystemStreamMetadata(Set("quux"), retryBackoff)
+      fail("expected CallLimitReached to be thrown")
     } catch {
-      case e: CallLimitReached => () // this would be less ugly if we were 
using scalatest
+      case e: ExponentialSleepStrategy.CallLimitReached => ()
       case e: Throwable => throw e
     }
-    assertEquals(retryBackoff.countCalls, 3)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f2fcb26a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
index cd0942a..3684db5 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
@@ -51,13 +51,13 @@ class TestKafkaSystemProducer {
     val props = getProps
     @volatile var msgsSent = new CountDownLatch(1)
 
-    val producer = new KafkaSystemProducer("test", 1, 100, () => {
+    val producer = new KafkaSystemProducer(systemName = "test", batchSize = 1, 
getProducer = (() => {
       new Producer[Object, Object](new ProducerConfig(props)) {
         override def send(messages: KeyedMessage[Object, Object]*) {
           msgsSent.countDown
         }
       }
-    }, new KafkaSystemProducerMetrics)
+    }), metrics = new KafkaSystemProducerMetrics)
 
     producer.register("test")
     producer.start
@@ -72,13 +72,13 @@ class TestKafkaSystemProducer {
     val props = getProps
     @volatile var msgsSent = 0
 
-    val producer = new KafkaSystemProducer("test", 2, 100, () => {
+    val producer = new KafkaSystemProducer(systemName = "test", batchSize = 2, 
getProducer = (() => {
       new Producer[Object, Object](new ProducerConfig(props)) {
         override def send(messages: KeyedMessage[Object, Object]*) {
           msgsSent += 1
         }
       }
-    }, new KafkaSystemProducerMetrics)
+    }), metrics = new KafkaSystemProducerMetrics)
 
     // second message should trigger the count down
     producer.register("test")
@@ -97,13 +97,13 @@ class TestKafkaSystemProducer {
     val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), 
"a")
     val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), 
"b")
 
-    val producer = new KafkaSystemProducer("test", 3, 100, () => {
+    val producer = new KafkaSystemProducer(systemName = "test", batchSize = 3, 
getProducer = (() => {
       new Producer[Object, Object](new ProducerConfig(props)) {
         override def send(messages: KeyedMessage[Object, Object]*) {
           msgs ++= messages.map(_.message.asInstanceOf[String])
         }
       }
-    }, new KafkaSystemProducerMetrics)
+    }), metrics = new KafkaSystemProducerMetrics)
 
     // flush should trigger the count down
     producer.register("test")
@@ -128,7 +128,7 @@ class TestKafkaSystemProducer {
     props.put("producer.type", "sync")
 
     var failCount = 0
-    val producer = new KafkaSystemProducer("test", 1, 100, () => {
+    val producer = new KafkaSystemProducer(systemName = "test", batchSize = 1, 
getProducer = (() => {
       failCount += 1
       if (failCount <= 5) {
         throw new RuntimeException("Pretend to fail in factory")
@@ -144,7 +144,7 @@ class TestKafkaSystemProducer {
           }
         }
       }
-    }, new KafkaSystemProducerMetrics)
+    }), metrics = new KafkaSystemProducerMetrics)
 
     producer.register("test")
     producer.start

Reply via email to