Repository: incubator-samza Updated Branches: refs/heads/master f2fcb26a3 -> 085a8b4b6
SAMZA-193; closing a disconnected KafkaCheckpointManager triggers an NPE Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/085a8b4b Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/085a8b4b Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/085a8b4b Branch: refs/heads/master Commit: 085a8b4b6b425a5cbdfbb115e3b39d19a90d9825 Parents: f2fcb26 Author: Neha Narkhede <[email protected]> Authored: Thu Mar 20 20:59:19 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Thu Mar 20 20:59:19 2014 -0700 ---------------------------------------------------------------------- .../apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/085a8b4b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index a1d2ffe..64e882b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -205,7 +205,11 @@ class KafkaCheckpointManager( partitions += partition } - def stop = producer.close + def stop = { + if(producer != null) { + producer.close + } + } private def createTopic { info("Attempting to create state topic %s with %s partitions." format (stateTopic, totalPartitions))
