Repository: flink Updated Branches: refs/heads/master abc1657ba -> 1836e08f0
[FLINK-4439] Validate 'bootstrap.servers' config in flink kafka consumer 0.8 This closes #2397 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1836e08f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1836e08f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1836e08f Branch: refs/heads/master Commit: 1836e08f0f8bb17ed50e59b42d02999436b36f6c Parents: abc1657 Author: George <[email protected]> Authored: Wed Oct 5 11:48:02 2016 +0200 Committer: Robert Metzger <[email protected]> Committed: Mon Oct 10 17:05:11 2016 +0200 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaConsumer08.java | 37 +++++++++++- .../connectors/kafka/KafkaConsumer08Test.java | 59 ++++++++++++++++++-- 2 files changed, 89 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1836e08f/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index d7a6364..0aacccd 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -42,7 +42,10 @@ import org.apache.flink.util.SerializedValue; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.Node; +import java.net.InetAddress; import java.net.URL; +import java.net.UnknownHostException; +import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -237,7 +240,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) { String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES); - + checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); String[] seedBrokers = seedBrokersConfString.split(","); List<KafkaTopicPartitionLeader> partitions = new ArrayList<>(); @@ -290,6 +293,8 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { } break retryLoop; // leave the loop through the brokers } catch (Exception e) { + //validates seed brokers in case of a ClosedChannelException + validateSeedBrokers(seedBrokers, e); LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + "." + "" + e.getClass() + ". Message: " + e.getMessage()); LOG.debug("Detailed trace", e); @@ -348,6 +353,36 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { } } + /** + * Validate that at least one seed broker is valid in case of a + * ClosedChannelException. + * + * @param seedBrokers + * array containing the seed brokers e.g. ["host1:port1", + * "host2:port2"] + * @param exception + * instance + */ + private static void validateSeedBrokers(String[] seedBrokers, Exception exception) { + if (!(exception instanceof ClosedChannelException)) { + return; + } + int unknownHosts = 0; + for (String broker : seedBrokers) { + URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim()); + try { + InetAddress.getByName(brokerUrl.getHost()); + } catch (UnknownHostException e) { + unknownHosts++; + } + } + // throw meaningful exception if all the provided hosts are invalid + if (unknownHosts == seedBrokers.length) { + throw new IllegalArgumentException("All the servers provided in: '" + + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)"); + } + } + private static long getInvalidOffsetBehavior(Properties config) { final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest"); if (val.equals("none")) { http://git-wip-us.apache.org/repos/asf/flink/blob/1836e08f/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java index f0b58cf..9520f55 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java @@ -18,17 +18,17 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.kafka.clients.consumer.ConsumerConfig; - -import org.junit.Test; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.Collections; import java.util.Properties; -import static org.junit.Assert.*; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.junit.Test; public class KafkaConsumer08Test { @@ -89,4 +89,51 @@ public class KafkaConsumer08Test { assertTrue(e.getMessage().contains("Unable to retrieve any partitions")); } } + + @Test + public void testAllBoostrapServerHostsAreInvalid() { + try { + String zookeeperConnect = "localhost:56794"; + String bootstrapServers = "indexistentHost:11111"; + String groupId = "non-existent-group"; + Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId); + FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), + new SimpleStringSchema(), props); + consumer.open(new Configuration()); + fail(); + } catch (Exception e) { + assertTrue("Exception should be thrown containing 'all bootstrap servers invalid' message!", + e.getMessage().contains("All the servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + + "' config are invalid")); + } + } + + @Test + public void testAtLeastOneBootstrapServerHostIsValid() { + try { + String zookeeperConnect = "localhost:56794"; + // we declare one valid boostrap server, namely the one with + // 'localhost' + String bootstrapServers = "indexistentHost:11111, localhost:22222"; + String groupId = "non-existent-group"; + Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId); + FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), + new SimpleStringSchema(), props); + consumer.open(new Configuration()); + fail(); + } catch (Exception e) { + // test is not failing because we have one valid boostrap server + assertTrue("The cause of the exception should not be 'all boostrap server are invalid'!", + !e.getMessage().contains("All the hosts provided in: " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + + " config are invalid")); + } + } + + private Properties createKafkaProps(String zookeeperConnect, String bootstrapServers, String groupId) { + Properties props = new Properties(); + props.setProperty("zookeeper.connect", zookeeperConnect); + props.setProperty("bootstrap.servers", bootstrapServers); + props.setProperty("group.id", groupId); + return props; + } }
