Gheorghe Gheorghe created FLINK-4439:
----------------------------------------
Summary: Error message KafkaConsumer08 when all
'bootstrap.servers' are invalid
Key: FLINK-4439
URL: https://issues.apache.org/jira/browse/FLINK-4439
Project: Flink
Issue Type: Improvement
Components: Streaming Connectors
Affects Versions: 1.0.3
Reporter: Gheorghe Gheorghe
Priority: Minor
The "flink-connector-kafka-0.8_2" is logging the following error when all
'bootstrap.servers' are invalid when passed to the FlinkKafkaConsumer08.
See stacktrace:
{code:title=stacktrace|borderStyle=solid}
2016-08-21 15:22:30 WARN FlinkKafkaConsumerBase:290 - Error communicating with
broker inexistentKafkHost:9092 to find partitions for [testTopic].class
java.nio.channels.ClosedChannelException. Message: null
2016-08-21 15:22:30 DEBUG FlinkKafkaConsumerBase:292 - Detailed trace
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:264)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:193)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:164)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:131)
at MetricsFromKafka$.main(MetricsFromKafka.scala:38)
at MetricsFromKafka.main(MetricsFromKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at sbt.Run.invokeMain(Run.scala:67)
at sbt.Run.run0(Run.scala:61)
at sbt.Run.sbt$Run$$execute$1(Run.scala:51)
at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55)
at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
at sbt.Logger$$anon$4.apply(Logger.scala:84)
at sbt.TrapExit$App.run(TrapExit.scala:248)
at java.lang.Thread.run(Thread.java:745)
{code}
In the above stackrace it is hard to figure out that the actual servers
provided as a config cannot be resolved to a valid ip address. Moreover the
flink kafka consumer will try all of those servers one by one and failing to
get partition information.
The suggested improvement is to fail fast and announce the user that the
servers provided in the 'boostrap.servers' config are invalid. If at least one
server is valid then the exception should not be thrown.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)