Repository: samza
Updated Branches:
  refs/heads/master 7f15302b0 -> 7cd1d397b


SAMZA-911: set a max retry on KafkaSystemProducer to avoid infinite loop


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7cd1d397
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7cd1d397
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7cd1d397

Branch: refs/heads/master
Commit: 7cd1d397b3845268188a0cda82e0808c85275e18
Parents: 7f15302
Author: Jagadish Venkatraman <[email protected]>
Authored: Wed Apr 27 15:04:20 2016 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Wed Apr 27 15:04:20 2016 -0700

----------------------------------------------------------------------
 .../system/kafka/KafkaSystemProducer.scala      | 18 ++++---
 .../system/kafka/TestKafkaSystemProducer.scala  | 51 ++++++++++++++++++--
 2 files changed, 58 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7cd1d397/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index 9a44d46..d1a7a9f 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -26,7 +26,7 @@ import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.TimerUtils
 import org.apache.samza.util.KafkaUtil
-import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference, 
AtomicBoolean}
 import java.util.{Map => javaMap}
 import org.apache.samza.SamzaException
 import org.apache.kafka.common.errors.RetriableException
@@ -40,7 +40,8 @@ class KafkaSystemProducer(systemName: String,
                           retryBackoff: ExponentialSleepStrategy = new 
ExponentialSleepStrategy,
                           getProducer: () => Producer[Array[Byte], 
Array[Byte]],
                           metrics: KafkaSystemProducerMetrics,
-                          val clock: () => Long = () => System.nanoTime) 
extends SystemProducer with Logging with TimerUtils
+                          val clock: () => Long = () => System.nanoTime,
+                          val maxRetries: Int = 30) extends SystemProducer 
with Logging with TimerUtils
 {
   var producer: Producer[Array[Byte], Array[Byte]] = null
   val latestFuture: javaMap[String, Future[RecordMetadata]] = new 
util.HashMap[String, Future[RecordMetadata]]()
@@ -67,6 +68,7 @@ class KafkaSystemProducer(systemName: String,
   }
 
   def send(source: String, envelope: OutgoingMessageEnvelope) {
+    var numRetries: AtomicInteger = new AtomicInteger(0)
     trace("Enqueueing message: %s, %s." format (source, envelope))
     if(producer == null) {
       info("Creating a new producer for system %s." format systemName)
@@ -114,17 +116,21 @@ class KafkaSystemProducer(systemName: String,
           loop.done
       },
       (exception, loop) => {
-        if(exception != null && !exception.isInstanceOf[RetriableException]) { 
  // Exception is thrown & not retriable
-          debug("Exception detail : ", exception)
+        if((exception != null && !exception.isInstanceOf[RetriableException]) 
|| numRetries.get() >= maxRetries) {
+          // Irrecoverable exceptions.
+          error("Exception detail : ", exception)
           //Close producer
           stop()
           producer = null
           //Mark loop as done as we are not going to retry
           loop.done
           metrics.sendFailed.inc
-          throw new SamzaException("Failed to send message. Exception:\n 
%s".format(exception))
+          throw new SamzaException(("Failed to send message on Topic:%s 
Partition:%s NumRetries:%s Exception:\n %s,")
+            .format(topicName, partitionKey, numRetries, exception))
         } else {
-          warn("Retrying send messsage due to RetriableException - %s. Turn on 
debugging to get a full stack trace".format(exception))
+          numRetries.incrementAndGet()
+          warn(("Retrying send due to RetriableException - %s for Topic:%s 
Partition:%s. " +
+            "Turn on debugging to get a full stack trace").format(exception, 
topicName, partitionKey))
           debug("Exception detail:", exception)
           metrics.retries.inc
         }

http://git-wip-us.apache.org/repos/asf/samza/blob/7cd1d397/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
index 39426d8..8e32bba 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.clients.producer._
 import java.util
 import org.junit.Assert._
 import org.scalatest.Assertions.intercept
-import org.apache.kafka.common.errors.RecordTooLargeException
+import org.apache.kafka.common.errors.{TimeoutException, 
RecordTooLargeException}
 import org.apache.samza.SamzaException
 
 
@@ -67,9 +67,10 @@ class TestKafkaSystemProducer {
     val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), 
"c".getBytes)
 
     val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producerMetrics = new KafkaSystemProducerMetrics
     val systemProducer = new KafkaSystemProducer(systemName = "test",
                                                  getProducer = () => 
mockProducer,
-                                                 metrics = new 
KafkaSystemProducerMetrics)
+                                                 metrics = producerMetrics)
     systemProducer.register("test")
     systemProducer.start()
     systemProducer.send("test", msg1)
@@ -83,6 +84,7 @@ class TestKafkaSystemProducer {
     sendThread.join()
 
     assertEquals(3, mockProducer.getMsgsSent)
+    assertEquals(0, producerMetrics.retries.getCount)
     systemProducer.stop()
   }
 
@@ -120,7 +122,7 @@ class TestKafkaSystemProducer {
     val mockProducer = new MockKafkaProducer(1, "test", 1)
     val systemProducer = new KafkaSystemProducer(systemName = "test",
                                                  getProducer = () => 
mockProducer,
-                                                 metrics = new 
KafkaSystemProducerMetrics)
+                                                 metrics = new 
KafkaSystemProducerMetrics())
     systemProducer.register("test")
     systemProducer.start()
     systemProducer.send("test", msg1)
@@ -132,6 +134,7 @@ class TestKafkaSystemProducer {
     systemProducer.send("test", msg4)
 
     assertEquals(1, mockProducer.getMsgsSent)
+
     mockProducer.startDelayedSendThread(2000)
     val thrown = intercept[SamzaException] {
       systemProducer.flush("test")
@@ -142,16 +145,53 @@ class TestKafkaSystemProducer {
   }
 
   @Test
-  def testKafkaProducerExceptions {
+  def testKafkaProducerWithRetriableException {
+    val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), 
"a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), 
"b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), 
"c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), 
"d".getBytes)
+    val numMaxRetries = 3
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producerMetrics = new KafkaSystemProducerMetrics()
+    val producer = new KafkaSystemProducer(systemName =  "test",
+      getProducer = () => mockProducer,
+      metrics = producerMetrics,
+      maxRetries = numMaxRetries)
+
+    producer.register("test")
+    producer.start()
+    producer.send("test", msg1)
+    producer.send("test", msg2)
+    producer.send("test", msg3)
+    producer.flush("test")
+
+    assertEquals(0, producerMetrics.retries.getCount)
+    mockProducer.setErrorNext(true, new TimeoutException())
+
+    val thrown = intercept[SamzaException] {
+      producer.send("test", msg4)
+    }
+    assertTrue(thrown.isInstanceOf[SamzaException])
+    assertTrue(thrown.getCause.isInstanceOf[TimeoutException])
+    assertEquals(true, producer.sendFailed.get())
+    assertEquals(3, mockProducer.getMsgsSent)
+    assertEquals(numMaxRetries, producerMetrics.retries.getCount)
+    producer.stop()
+  }
+
+  @Test
+  def testKafkaProducerWithNonRetriableExceptions {
     val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), 
"a".getBytes)
     val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), 
"b".getBytes)
     val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), 
"c".getBytes)
     val msg4 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), 
"d".getBytes)
+    val producerMetrics = new KafkaSystemProducerMetrics()
 
     val mockProducer = new MockKafkaProducer(1, "test", 1)
     val producer = new KafkaSystemProducer(systemName =  "test",
                                            getProducer = () => mockProducer,
-                                           metrics = new 
KafkaSystemProducerMetrics)
+                                           metrics = producerMetrics)
     producer.register("test")
     producer.start()
     producer.send("test", msg1)
@@ -166,6 +206,7 @@ class TestKafkaSystemProducer {
     assertTrue(thrown.getCause.isInstanceOf[RecordTooLargeException])
     assertEquals(true, producer.sendFailed.get())
     assertEquals(3, mockProducer.getMsgsSent)
+    assertEquals(0, producerMetrics.retries.getCount)
     producer.stop()
   }
 

Reply via email to