Repository: samza
Updated Branches:
  refs/heads/master 8b52e8aec -> 28e2dbb89


SAMZA-458; closing kafka system producers flushes all buffered messages


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/28e2dbb8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/28e2dbb8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/28e2dbb8

Branch: refs/heads/master
Commit: 28e2dbb895d1a9bb4d5d2662966902c7d85546be
Parents: 8b52e8a
Author: Yan Fang <[email protected]>
Authored: Mon Mar 23 12:41:47 2015 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Mon Mar 23 12:41:47 2015 -0700

----------------------------------------------------------------------
 .../system/kafka/KafkaSystemProducer.scala      | 10 ++++--
 .../system/kafka/TestKafkaSystemProducer.scala  | 33 ++++++++++++++++++--
 2 files changed, 37 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/28e2dbb8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index 83668dd..19bc37d 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.RetriableException
 import org.apache.kafka.common.PartitionInfo
 import java.util
 import java.util.concurrent.Future
+import scala.collection.JavaConversions._
 
 
 class KafkaSystemProducer(systemName: String,
@@ -51,6 +52,7 @@ class KafkaSystemProducer(systemName: String,
 
   def stop() {
     if (producer != null) {
+      latestFuture.keys.foreach(flush(_))
       producer.close
     }
   }
@@ -136,11 +138,13 @@ class KafkaSystemProducer(systemName: String,
         }
         if (sendFailed.get()) {
           logger.error("Unable to send message from %s to system %s" 
format(source, systemName))
-          //Close producer
-          stop()
+          //Close producer.
+          if (producer != null) {
+            producer.close
+          }
           producer = null
           metrics.flushFailed.inc
-          throw new SamzaException("Unable to send message from %s to system 
%s" format(source, systemName))
+          throw new SamzaException("Unable to send message from %s to system 
%s." format(source, systemName), exceptionThrown.get)
         } else {
           trace("Flushed %s." format (source))
         }

http://git-wip-us.apache.org/repos/asf/samza/blob/28e2dbb8/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
index ca10ea5..ef5a55b 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
@@ -133,8 +133,8 @@ class TestKafkaSystemProducer {
     assertEquals(1, mockProducer.getMsgsSent)
     mockProducer.startDelayedSendThread(2000)
     val thrown = intercept[SamzaException] {
-                                             systemProducer.flush("test")
-                                           }
+      systemProducer.flush("test")
+    }
     assertTrue(thrown.isInstanceOf[SamzaException])
     assertEquals(2, mockProducer.getMsgsSent)
     systemProducer.stop()
@@ -162,9 +162,36 @@ class TestKafkaSystemProducer {
        producer.send("test", msg4)
     }
     assertTrue(thrown.isInstanceOf[SamzaException])
-    assertTrue(thrown.getMessage.contains("RecordTooLargeException"))
+    assertTrue(thrown.getCause.isInstanceOf[RecordTooLargeException])
     assertEquals(true, producer.sendFailed.get())
     assertEquals(3, mockProducer.getMsgsSent)
     producer.stop()
   }
+
+  @Test
+  def testKafkaProducerFlushMsgsWhenStop {
+    val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), 
"a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), 
"b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), 
"c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(new SystemStream("test2", "test"), 
"d".getBytes)
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val systemProducer = new KafkaSystemProducer(systemName = "test",
+                                                 getProducer = () => 
mockProducer,
+                                                 metrics = new 
KafkaSystemProducerMetrics)
+    systemProducer.register("test")
+    systemProducer.register("test2")
+    systemProducer.start()
+    systemProducer.send("test", msg1)
+
+    mockProducer.setShouldBuffer(true)
+    systemProducer.send("test", msg2)
+    systemProducer.send("test", msg3)
+    systemProducer.send("test2", msg4)
+    assertEquals(1, mockProducer.getMsgsSent)
+    mockProducer.startDelayedSendThread(2000)
+    assertEquals(1, mockProducer.getMsgsSent)
+    systemProducer.stop()
+    assertEquals(4, mockProducer.getMsgsSent)
+  }
 }

Reply via email to