Repository: apex-malhar Updated Branches: refs/heads/master 3f30b81a6 -> da3b4317f
APEXMALHAR-2134 Fix the NullPointerException, if the kafka partition has no leader broker Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/af425a5f Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/af425a5f Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/af425a5f Branch: refs/heads/master Commit: af425a5fd062741790eb5f7243c65d47e49a3721 Parents: 0a1adff Author: chaitanya <[email protected]> Authored: Fri Aug 26 00:51:43 2016 +0530 Committer: chaitanya <[email protected]> Committed: Fri Aug 26 21:29:16 2016 +0530 ---------------------------------------------------------------------- .../java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java | 4 ++++ 1 file changed, 4 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/af425a5f/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java index e10502b..fb89389 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java @@ -508,6 +508,10 @@ public class SimpleKafkaConsumer extends KafkaConsumer continue; } Broker b = pm.leader(); + if (b == null) { + logger.info("No Leader broker for Kafka Partition {}. Skipping it for time until new leader is elected", kp.getPartitionId()); + continue; + } Broker oldB = partitionToBroker.put(kp, b); if (b.equals(oldB)) { continue;
