Repository: samza Updated Branches: refs/heads/master 8b52e8aec -> 28e2dbb89
SAMZA-458; closing kafka system producers flushes all buffered messages Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/28e2dbb8 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/28e2dbb8 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/28e2dbb8 Branch: refs/heads/master Commit: 28e2dbb895d1a9bb4d5d2662966902c7d85546be Parents: 8b52e8a Author: Yan Fang <[email protected]> Authored: Mon Mar 23 12:41:47 2015 -0700 Committer: Chris Riccomini <[email protected]> Committed: Mon Mar 23 12:41:47 2015 -0700 ---------------------------------------------------------------------- .../system/kafka/KafkaSystemProducer.scala | 10 ++++-- .../system/kafka/TestKafkaSystemProducer.scala | 33 ++++++++++++++++++-- 2 files changed, 37 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/28e2dbb8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala index 83668dd..19bc37d 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala @@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.RetriableException import org.apache.kafka.common.PartitionInfo import java.util import java.util.concurrent.Future +import scala.collection.JavaConversions._ class KafkaSystemProducer(systemName: String, @@ -51,6 +52,7 @@ class KafkaSystemProducer(systemName: String, def stop() { if (producer != null) { + latestFuture.keys.foreach(flush(_)) producer.close } } @@ -136,11 +138,13 @@ class KafkaSystemProducer(systemName: String, } if (sendFailed.get()) { logger.error("Unable to send message from %s to system %s" format(source, systemName)) - //Close producer - stop() + //Close producer. + if (producer != null) { + producer.close + } producer = null metrics.flushFailed.inc - throw new SamzaException("Unable to send message from %s to system %s" format(source, systemName)) + throw new SamzaException("Unable to send message from %s to system %s." format(source, systemName), exceptionThrown.get) } else { trace("Flushed %s." format (source)) } http://git-wip-us.apache.org/repos/asf/samza/blob/28e2dbb8/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala index ca10ea5..ef5a55b 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala @@ -133,8 +133,8 @@ class TestKafkaSystemProducer { assertEquals(1, mockProducer.getMsgsSent) mockProducer.startDelayedSendThread(2000) val thrown = intercept[SamzaException] { - systemProducer.flush("test") - } + systemProducer.flush("test") + } assertTrue(thrown.isInstanceOf[SamzaException]) assertEquals(2, mockProducer.getMsgsSent) systemProducer.stop() @@ -162,9 +162,36 @@ class TestKafkaSystemProducer { producer.send("test", msg4) } assertTrue(thrown.isInstanceOf[SamzaException]) - assertTrue(thrown.getMessage.contains("RecordTooLargeException")) + assertTrue(thrown.getCause.isInstanceOf[RecordTooLargeException]) assertEquals(true, producer.sendFailed.get()) assertEquals(3, mockProducer.getMsgsSent) producer.stop() } + + @Test + def testKafkaProducerFlushMsgsWhenStop { + val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes) + val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes) + val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes) + val msg4 = new OutgoingMessageEnvelope(new SystemStream("test2", "test"), "d".getBytes) + + val mockProducer = new MockKafkaProducer(1, "test", 1) + val systemProducer = new KafkaSystemProducer(systemName = "test", + getProducer = () => mockProducer, + metrics = new KafkaSystemProducerMetrics) + systemProducer.register("test") + systemProducer.register("test2") + systemProducer.start() + systemProducer.send("test", msg1) + + mockProducer.setShouldBuffer(true) + systemProducer.send("test", msg2) + systemProducer.send("test", msg3) + systemProducer.send("test2", msg4) + assertEquals(1, mockProducer.getMsgsSent) + mockProducer.startDelayedSendThread(2000) + assertEquals(1, mockProducer.getMsgsSent) + systemProducer.stop() + assertEquals(4, mockProducer.getMsgsSent) + } }
