[FLINK-3541] [Kafka Connector] Clean up workaround in FlinkKafkaConsumer09

This closes #1846


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/693d5ab0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/693d5ab0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/693d5ab0

Branch: refs/heads/master
Commit: 693d5ab09efef8732b857437bf1089f841b5e864
Parents: d20eda1
Author: Tianji Li <[email protected]>
Authored: Fri Apr 1 00:35:39 2016 -0400
Committer: Stephan Ewen <[email protected]>
Committed: Wed Apr 13 01:10:54 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer09.java  | 30 ++------------------
 1 file changed, 2 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/693d5ab0/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index d34cd2f..bc2904c 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -171,43 +171,17 @@ public class FlinkKafkaConsumer09<T> extends 
FlinkKafkaConsumerBase<T> {
 
                // read the partitions that belong to the listed topics
                final List<KafkaTopicPartition> partitions = new ArrayList<>();
-               KafkaConsumer<byte[], byte[]> consumer = null;
 
-               try {
-                       consumer = new KafkaConsumer<>(this.properties);
+               try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(this.properties)) {
                        for (final String topic: topics) {
                                // get partitions for each topic
-                               List<PartitionInfo> partitionsForTopic = null;
-                               for(int tri = 0; tri < 10; tri++) {
-                                       LOG.info("Trying to get partitions for 
topic {}", topic);
-                                       try {
-                                               partitionsForTopic = 
consumer.partitionsFor(topic);
-                                               if(partitionsForTopic != null 
&& partitionsForTopic.size() > 0) {
-                                                       break; // it worked
-                                               }
-                                       } catch (NullPointerException npe) {
-                                               // workaround for KAFKA-2880: 
Fetcher.getTopicMetadata NullPointerException when broker cannot be reached
-                                               // we ignore the NPE.
-                                       }
-                                       // create a new consumer
-                                       consumer.close();
-                                       try {
-                                               Thread.sleep(1000);
-                                       } catch (InterruptedException ignored) 
{}
-                                       
-                                       consumer = new 
KafkaConsumer<>(properties);
-                               }
+                               List<PartitionInfo> partitionsForTopic = 
consumer.partitionsFor(topic);
                                // for non existing topics, the list might be 
null.
                                if (partitionsForTopic != null) {
                                        
partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
                                }
                        }
                }
-               finally {
-                       if(consumer != null) {
-                               consumer.close();
-                       }
-               }
 
                if (partitions.isEmpty()) {
                        throw new RuntimeException("Unable to retrieve any 
partitions for the requested topics " + topics);

Reply via email to