[FLINK-3541] [Kafka Connector] Clean up workaround in FlinkKafkaConsumer09 This closes #1846
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/693d5ab0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/693d5ab0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/693d5ab0 Branch: refs/heads/master Commit: 693d5ab09efef8732b857437bf1089f841b5e864 Parents: d20eda1 Author: Tianji Li <[email protected]> Authored: Fri Apr 1 00:35:39 2016 -0400 Committer: Stephan Ewen <[email protected]> Committed: Wed Apr 13 01:10:54 2016 +0200 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaConsumer09.java | 30 ++------------------ 1 file changed, 2 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/693d5ab0/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index d34cd2f..bc2904c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -171,43 +171,17 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { // read the partitions that belong to the listed topics final List<KafkaTopicPartition> partitions = new ArrayList<>(); - KafkaConsumer<byte[], byte[]> consumer = null; - try { - consumer = new KafkaConsumer<>(this.properties); + try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(this.properties)) { for (final String topic: topics) { // get partitions for each topic - List<PartitionInfo> partitionsForTopic = null; - for(int tri = 0; tri < 10; tri++) { - LOG.info("Trying to get partitions for topic {}", topic); - try { - partitionsForTopic = consumer.partitionsFor(topic); - if(partitionsForTopic != null && partitionsForTopic.size() > 0) { - break; // it worked - } - } catch (NullPointerException npe) { - // workaround for KAFKA-2880: Fetcher.getTopicMetadata NullPointerException when broker cannot be reached - // we ignore the NPE. - } - // create a new consumer - consumer.close(); - try { - Thread.sleep(1000); - } catch (InterruptedException ignored) {} - - consumer = new KafkaConsumer<>(properties); - } + List<PartitionInfo> partitionsForTopic = consumer.partitionsFor(topic); // for non existing topics, the list might be null. if (partitionsForTopic != null) { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); } } } - finally { - if(consumer != null) { - consumer.close(); - } - } if (partitions.isEmpty()) { throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics);
