Repository: incubator-samza
Updated Branches:
  refs/heads/master f2fcb26a3 -> 085a8b4b6


SAMZA-193; closing a disconnected KafkaCheckpointManager triggers an NPE


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/085a8b4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/085a8b4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/085a8b4b

Branch: refs/heads/master
Commit: 085a8b4b6b425a5cbdfbb115e3b39d19a90d9825
Parents: f2fcb26
Author: Neha Narkhede <[email protected]>
Authored: Thu Mar 20 20:59:19 2014 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Thu Mar 20 20:59:19 2014 -0700

----------------------------------------------------------------------
 .../apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/085a8b4b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index a1d2ffe..64e882b 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -205,7 +205,11 @@ class KafkaCheckpointManager(
     partitions += partition
   }
 
-  def stop = producer.close
+  def stop = {
+    if(producer != null) {
+      producer.close
+    }
+  }
 
   private def createTopic {
     info("Attempting to create state topic %s with %s partitions." format 
(stateTopic, totalPartitions))

Reply via email to