Repository: crunch Updated Branches: refs/heads/master 5d237b366 -> e8d2a69b6
CRUNCH-617: Support defensively handling null when partition leader cannot be found. Signed-off-by: Micah Whitacre <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/e8d2a69b Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/e8d2a69b Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/e8d2a69b Branch: refs/heads/master Commit: e8d2a69b6df297f02dfe45053d0a72f6f32cd524 Parents: 5d237b3 Author: Micah Whitacre <[email protected]> Authored: Tue Sep 6 15:55:56 2016 -0500 Committer: Micah Whitacre <[email protected]> Committed: Thu Sep 8 11:08:07 2016 -0500 ---------------------------------------------------------------------- .../src/main/java/org/apache/crunch/kafka/KafkaUtils.java | 6 ++++++ 1 file changed, 6 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/e8d2a69b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java index aeea6fb..9065bee 100644 --- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java @@ -30,12 +30,14 @@ import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.TopicMetadataResponse; import kafka.javaapi.consumer.SimpleConsumer; import org.apache.commons.lang.StringUtils; +import org.apache.crunch.CrunchRuntimeException; import org.apache.hadoop.conf.Configuration; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.SecurityProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -190,6 +192,10 @@ public class KafkaUtils { new HashMap<>(); BrokerEndPoint brokerEndPoint = partition.leader(); + if(brokerEndPoint == null){ + throw new CrunchRuntimeException("Unable to find leader for topic:"+metadata.topic() + +" partition:"+partition.partitionId()); + } Broker leader = new Broker(0, brokerEndPoint.host(), brokerEndPoint.port(), SecurityProtocol.PLAINTEXT); if (brokerRequests.containsKey(leader))
