Repository: samza Updated Branches: refs/heads/master 7f15302b0 -> 7cd1d397b
SAMZA-911: set a max retry on KafkaSystemProducer to avoid infinite loop Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7cd1d397 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7cd1d397 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7cd1d397 Branch: refs/heads/master Commit: 7cd1d397b3845268188a0cda82e0808c85275e18 Parents: 7f15302 Author: Jagadish Venkatraman <[email protected]> Authored: Wed Apr 27 15:04:20 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Wed Apr 27 15:04:20 2016 -0700 ---------------------------------------------------------------------- .../system/kafka/KafkaSystemProducer.scala | 18 ++++--- .../system/kafka/TestKafkaSystemProducer.scala | 51 ++++++++++++++++++-- 2 files changed, 58 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7cd1d397/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala index 9a44d46..d1a7a9f 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala @@ -26,7 +26,7 @@ import org.apache.samza.system.OutgoingMessageEnvelope import org.apache.samza.util.ExponentialSleepStrategy import org.apache.samza.util.TimerUtils import org.apache.samza.util.KafkaUtil -import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference, AtomicBoolean} import java.util.{Map => javaMap} import org.apache.samza.SamzaException import org.apache.kafka.common.errors.RetriableException @@ -40,7 +40,8 @@ class KafkaSystemProducer(systemName: String, retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, getProducer: () => Producer[Array[Byte], Array[Byte]], metrics: KafkaSystemProducerMetrics, - val clock: () => Long = () => System.nanoTime) extends SystemProducer with Logging with TimerUtils + val clock: () => Long = () => System.nanoTime, + val maxRetries: Int = 30) extends SystemProducer with Logging with TimerUtils { var producer: Producer[Array[Byte], Array[Byte]] = null val latestFuture: javaMap[String, Future[RecordMetadata]] = new util.HashMap[String, Future[RecordMetadata]]() @@ -67,6 +68,7 @@ class KafkaSystemProducer(systemName: String, } def send(source: String, envelope: OutgoingMessageEnvelope) { + var numRetries: AtomicInteger = new AtomicInteger(0) trace("Enqueueing message: %s, %s." format (source, envelope)) if(producer == null) { info("Creating a new producer for system %s." format systemName) @@ -114,17 +116,21 @@ class KafkaSystemProducer(systemName: String, loop.done }, (exception, loop) => { - if(exception != null && !exception.isInstanceOf[RetriableException]) { // Exception is thrown & not retriable - debug("Exception detail : ", exception) + if((exception != null && !exception.isInstanceOf[RetriableException]) || numRetries.get() >= maxRetries) { + // Irrecoverable exceptions. + error("Exception detail : ", exception) //Close producer stop() producer = null //Mark loop as done as we are not going to retry loop.done metrics.sendFailed.inc - throw new SamzaException("Failed to send message. Exception:\n %s".format(exception)) + throw new SamzaException(("Failed to send message on Topic:%s Partition:%s NumRetries:%s Exception:\n %s,") + .format(topicName, partitionKey, numRetries, exception)) } else { - warn("Retrying send messsage due to RetriableException - %s. Turn on debugging to get a full stack trace".format(exception)) + numRetries.incrementAndGet() + warn(("Retrying send due to RetriableException - %s for Topic:%s Partition:%s. " + + "Turn on debugging to get a full stack trace").format(exception, topicName, partitionKey)) debug("Exception detail:", exception) metrics.retries.inc } http://git-wip-us.apache.org/repos/asf/samza/blob/7cd1d397/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala index 39426d8..8e32bba 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala @@ -26,7 +26,7 @@ import org.apache.kafka.clients.producer._ import java.util import org.junit.Assert._ import org.scalatest.Assertions.intercept -import org.apache.kafka.common.errors.RecordTooLargeException +import org.apache.kafka.common.errors.{TimeoutException, RecordTooLargeException} import org.apache.samza.SamzaException @@ -67,9 +67,10 @@ class TestKafkaSystemProducer { val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes) val mockProducer = new MockKafkaProducer(1, "test", 1) + val producerMetrics = new KafkaSystemProducerMetrics val systemProducer = new KafkaSystemProducer(systemName = "test", getProducer = () => mockProducer, - metrics = new KafkaSystemProducerMetrics) + metrics = producerMetrics) systemProducer.register("test") systemProducer.start() systemProducer.send("test", msg1) @@ -83,6 +84,7 @@ class TestKafkaSystemProducer { sendThread.join() assertEquals(3, mockProducer.getMsgsSent) + assertEquals(0, producerMetrics.retries.getCount) systemProducer.stop() } @@ -120,7 +122,7 @@ class TestKafkaSystemProducer { val mockProducer = new MockKafkaProducer(1, "test", 1) val systemProducer = new KafkaSystemProducer(systemName = "test", getProducer = () => mockProducer, - metrics = new KafkaSystemProducerMetrics) + metrics = new KafkaSystemProducerMetrics()) systemProducer.register("test") systemProducer.start() systemProducer.send("test", msg1) @@ -132,6 +134,7 @@ class TestKafkaSystemProducer { systemProducer.send("test", msg4) assertEquals(1, mockProducer.getMsgsSent) + mockProducer.startDelayedSendThread(2000) val thrown = intercept[SamzaException] { systemProducer.flush("test") @@ -142,16 +145,53 @@ class TestKafkaSystemProducer { } @Test - def testKafkaProducerExceptions { + def testKafkaProducerWithRetriableException { + val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes) + val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes) + val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes) + val msg4 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "d".getBytes) + val numMaxRetries = 3 + + val mockProducer = new MockKafkaProducer(1, "test", 1) + val producerMetrics = new KafkaSystemProducerMetrics() + val producer = new KafkaSystemProducer(systemName = "test", + getProducer = () => mockProducer, + metrics = producerMetrics, + maxRetries = numMaxRetries) + + producer.register("test") + producer.start() + producer.send("test", msg1) + producer.send("test", msg2) + producer.send("test", msg3) + producer.flush("test") + + assertEquals(0, producerMetrics.retries.getCount) + mockProducer.setErrorNext(true, new TimeoutException()) + + val thrown = intercept[SamzaException] { + producer.send("test", msg4) + } + assertTrue(thrown.isInstanceOf[SamzaException]) + assertTrue(thrown.getCause.isInstanceOf[TimeoutException]) + assertEquals(true, producer.sendFailed.get()) + assertEquals(3, mockProducer.getMsgsSent) + assertEquals(numMaxRetries, producerMetrics.retries.getCount) + producer.stop() + } + + @Test + def testKafkaProducerWithNonRetriableExceptions { val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes) val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes) val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes) val msg4 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "d".getBytes) + val producerMetrics = new KafkaSystemProducerMetrics() val mockProducer = new MockKafkaProducer(1, "test", 1) val producer = new KafkaSystemProducer(systemName = "test", getProducer = () => mockProducer, - metrics = new KafkaSystemProducerMetrics) + metrics = producerMetrics) producer.register("test") producer.start() producer.send("test", msg1) @@ -166,6 +206,7 @@ class TestKafkaSystemProducer { assertTrue(thrown.getCause.isInstanceOf[RecordTooLargeException]) assertEquals(true, producer.sendFailed.get()) assertEquals(3, mockProducer.getMsgsSent) + assertEquals(0, producerMetrics.retries.getCount) producer.stop() }
