Repository: samza Updated Branches: refs/heads/master 492a3d79d -> 52ff04197
SAMZA-1069: Fix Deadlock between KafkaSystemProducer and KafkaProducer Moving the producer.close() and sources.flush() outside the lock so it won't have race condition with the kafka network thread callbacks. Author: Xinyu Liu <[email protected]> Reviewers: Yi Pan <[email protected]> Closes #37 from xinyuiscool/SAMZA-1069 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/52ff0419 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/52ff0419 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/52ff0419 Branch: refs/heads/master Commit: 52ff04197dacc5ab18137d7c9e0667e8212ab216 Parents: 492a3d7 Author: Xinyu Liu <[email protected]> Authored: Fri Dec 23 14:32:01 2016 -0800 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Fri Dec 23 14:32:01 2016 -0800 ---------------------------------------------------------------------- .../system/kafka/KafkaSystemProducer.scala | 28 +++++++++++--------- 1 file changed, 16 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/52ff0419/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala index aac53fc..6efd2dc 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala @@ -74,21 +74,25 @@ class KafkaSystemProducer(systemName: String, } def stop() { - producerLock.synchronized { - try { - if (producer != null) { - producer.close - producer = null - - sources.foreach {p => - if (p._2.exceptionInCallback.get() == null) { - flush(p._1) - } + try { + val currentProducer = producer + if (currentProducer != null) { + producerLock.synchronized { + if (currentProducer == producer) { + // only nullify the member producer if it is still the same object, no point nullifying new producer + producer = null + } + } + currentProducer.close + + sources.foreach {p => + if (p._2.exceptionInCallback.get() == null) { + flush(p._1) } } - } catch { - case e: Exception => error(e.getMessage, e) } + } catch { + case e: Exception => error(e.getMessage, e) } }
