Repository: flink
Updated Branches:
  refs/heads/master abc1657ba -> 1836e08f0


[FLINK-4439] Validate 'bootstrap.servers' config in flink kafka consumer 0.8

This closes #2397


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1836e08f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1836e08f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1836e08f

Branch: refs/heads/master
Commit: 1836e08f0f8bb17ed50e59b42d02999436b36f6c
Parents: abc1657
Author: George <[email protected]>
Authored: Wed Oct 5 11:48:02 2016 +0200
Committer: Robert Metzger <[email protected]>
Committed: Mon Oct 10 17:05:11 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer08.java  | 37 +++++++++++-
 .../connectors/kafka/KafkaConsumer08Test.java   | 59 ++++++++++++++++++--
 2 files changed, 89 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1836e08f/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index d7a6364..0aacccd 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -42,7 +42,10 @@ import org.apache.flink.util.SerializedValue;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Node;
 
+import java.net.InetAddress;
 import java.net.URL;
+import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -237,7 +240,7 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
        public static List<KafkaTopicPartitionLeader> 
getPartitionsForTopic(List<String> topics, Properties properties) {
                String seedBrokersConfString = 
properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
                final int numRetries = getInt(properties, 
GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
-
+               
                checkNotNull(seedBrokersConfString, "Configuration property %s 
not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
                String[] seedBrokers = seedBrokersConfString.split(",");
                List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
@@ -290,6 +293,8 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
                                        }
                                        break retryLoop; // leave the loop 
through the brokers
                                } catch (Exception e) {
+                                       //validates seed brokers in case of a 
ClosedChannelException
+                                       validateSeedBrokers(seedBrokers, e);
                                        LOG.warn("Error communicating with 
broker " + seedBroker + " to find partitions for " + topics.toString() + "." +
                                                        "" + e.getClass() + ". 
Message: " + e.getMessage());
                                        LOG.debug("Detailed trace", e);
@@ -348,6 +353,36 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
                }
        }
 
+       /**
+        * Validate that at least one seed broker is valid in case of a
+        * ClosedChannelException.
+        * 
+        * @param seedBrokers
+        *            array containing the seed brokers e.g. ["host1:port1",
+        *            "host2:port2"]
+        * @param exception
+        *            instance
+        */
+       private static void validateSeedBrokers(String[] seedBrokers, Exception 
exception) {
+               if (!(exception instanceof ClosedChannelException)) {
+                       return;
+               }
+               int unknownHosts = 0;
+               for (String broker : seedBrokers) {
+                       URL brokerUrl = 
NetUtils.getCorrectHostnamePort(broker.trim());
+                       try {
+                               InetAddress.getByName(brokerUrl.getHost());
+                       } catch (UnknownHostException e) {
+                               unknownHosts++;
+                       }
+               }
+               // throw meaningful exception if all the provided hosts are 
invalid
+               if (unknownHosts == seedBrokers.length) {
+                       throw new IllegalArgumentException("All the servers 
provided in: '"
+                                       + 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown 
hosts)");
+               }
+       }
+
        private static long getInvalidOffsetBehavior(Properties config) {
                final String val = 
config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
                if (val.equals("none")) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1836e08f/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
index f0b58cf..9520f55 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
@@ -18,17 +18,17 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.Collections;
 import java.util.Properties;
 
-import static org.junit.Assert.*;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.Test;
 
 public class KafkaConsumer08Test {
 
@@ -89,4 +89,51 @@ public class KafkaConsumer08Test {
                        assertTrue(e.getMessage().contains("Unable to retrieve 
any partitions"));
                }
        }
+
+       @Test
+       public void testAllBoostrapServerHostsAreInvalid() {
+               try {
+                       String zookeeperConnect = "localhost:56794";
+                       String bootstrapServers = "indexistentHost:11111";
+                       String groupId = "non-existent-group";
+                       Properties props = createKafkaProps(zookeeperConnect, 
bootstrapServers, groupId);
+                       FlinkKafkaConsumer08<String> consumer = new 
FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
+                                       new SimpleStringSchema(), props);
+                       consumer.open(new Configuration());
+                       fail();
+               } catch (Exception e) {
+                       assertTrue("Exception should be thrown containing 'all 
bootstrap servers invalid' message!",
+                                       e.getMessage().contains("All the 
servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+                                                       + "' config are 
invalid"));
+               }
+       }
+
+       @Test
+       public void testAtLeastOneBootstrapServerHostIsValid() {
+               try {
+                       String zookeeperConnect = "localhost:56794";
+                       // we declare one valid boostrap server, namely the one 
with
+                       // 'localhost'
+                       String bootstrapServers = "indexistentHost:11111, 
localhost:22222";
+                       String groupId = "non-existent-group";
+                       Properties props = createKafkaProps(zookeeperConnect, 
bootstrapServers, groupId);
+                       FlinkKafkaConsumer08<String> consumer = new 
FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
+                                       new SimpleStringSchema(), props);
+                       consumer.open(new Configuration());
+                       fail();
+               } catch (Exception e) {
+                       // test is not failing because we have one valid 
boostrap server
+                       assertTrue("The cause of the exception should not be 
'all boostrap server are invalid'!",
+                                       !e.getMessage().contains("All the hosts 
provided in: " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+                                                       + " config are 
invalid"));
+               }
+       }
+       
+       private Properties createKafkaProps(String zookeeperConnect, String 
bootstrapServers, String groupId) {
+               Properties props = new Properties();
+               props.setProperty("zookeeper.connect", zookeeperConnect);
+               props.setProperty("bootstrap.servers", bootstrapServers);
+               props.setProperty("group.id", groupId);
+               return props;
+       }
 }

Reply via email to