Repository: samza
Updated Branches:
  refs/heads/master 492a3d79d -> 52ff04197


SAMZA-1069: Fix Deadlock between KafkaSystemProducer and KafkaProducer

Moving the producer.close() and sources.flush() outside the lock so it won't 
have race condition with the kafka network thread callbacks.

Author: Xinyu Liu <[email protected]>

Reviewers: Yi Pan <[email protected]>

Closes #37 from xinyuiscool/SAMZA-1069


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/52ff0419
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/52ff0419
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/52ff0419

Branch: refs/heads/master
Commit: 52ff04197dacc5ab18137d7c9e0667e8212ab216
Parents: 492a3d7
Author: Xinyu Liu <[email protected]>
Authored: Fri Dec 23 14:32:01 2016 -0800
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Fri Dec 23 14:32:01 2016 -0800

----------------------------------------------------------------------
 .../system/kafka/KafkaSystemProducer.scala      | 28 +++++++++++---------
 1 file changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/52ff0419/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index aac53fc..6efd2dc 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -74,21 +74,25 @@ class KafkaSystemProducer(systemName: String,
   }
 
   def stop() {
-    producerLock.synchronized {
-      try {
-        if (producer != null) {
-          producer.close
-          producer = null
-
-          sources.foreach {p =>
-            if (p._2.exceptionInCallback.get() == null) {
-              flush(p._1)
-            }
+    try {
+      val currentProducer = producer
+      if (currentProducer != null) {
+        producerLock.synchronized {
+          if (currentProducer == producer) {
+            // only nullify the member producer if it is still the same 
object, no point nullifying new producer
+            producer = null
+          }
+        }
+        currentProducer.close
+
+        sources.foreach {p =>
+          if (p._2.exceptionInCallback.get() == null) {
+            flush(p._1)
           }
         }
-      } catch {
-        case e: Exception => error(e.getMessage, e)
       }
+    } catch {
+      case e: Exception => error(e.getMessage, e)
     }
   }
 

Reply via email to