[
https://issues.apache.org/jira/browse/FLINK-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15429735#comment-15429735
]
Robert Metzger commented on FLINK-4439:
---------------------------------------
In my opinion, the logging is pretty good.
There's a log message at WARN level:
{code}
2016-08-21 15:22:30 WARN FlinkKafkaConsumerBase:290 - Error communicating with
broker inexistentKafkHost:9092 to find partitions for [testTopic].class
java.nio.channels.ClosedChannelException. Message: null
{code}
and the stack trace is at debug level.
I'm not sure if failing fast is a good solution: Maybe its just a temporary
issue with the broker, or the client can not contact the broker.
> Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid
> ----------------------------------------------------------------------
>
> Key: FLINK-4439
> URL: https://issues.apache.org/jira/browse/FLINK-4439
> Project: Flink
> Issue Type: Improvement
> Components: Streaming Connectors
> Affects Versions: 1.0.3
> Reporter: Gheorghe Gheorghe
> Priority: Minor
>
> The "flink-connector-kafka-0.8_2" is logging the following error when all
> 'bootstrap.servers' are invalid when passed to the FlinkKafkaConsumer08.
> See stacktrace:
> {code:title=stacktrace|borderStyle=solid}
> 2016-08-21 15:22:30 WARN FlinkKafkaConsumerBase:290 - Error communicating
> with broker inexistentKafkHost:9092 to find partitions for [testTopic].class
> java.nio.channels.ClosedChannelException. Message: null
> 2016-08-21 15:22:30 DEBUG FlinkKafkaConsumerBase:292 - Detailed trace
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
> at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:264)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:193)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:164)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:131)
> at MetricsFromKafka$.main(MetricsFromKafka.scala:38)
> at MetricsFromKafka.main(MetricsFromKafka.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at sbt.Run.invokeMain(Run.scala:67)
> at sbt.Run.run0(Run.scala:61)
> at sbt.Run.sbt$Run$$execute$1(Run.scala:51)
> at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55)
> at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
> at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
> at sbt.Logger$$anon$4.apply(Logger.scala:84)
> at sbt.TrapExit$App.run(TrapExit.scala:248)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> In the above stackrace it is hard to figure out that the actual servers
> provided as a config cannot be resolved to a valid ip address. Moreover the
> flink kafka consumer will try all of those servers one by one and failing to
> get partition information.
> The suggested improvement is to fail fast and announce the user that the
> servers provided in the 'boostrap.servers' config are invalid. If at least
> one server is valid then the exception should not be thrown.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)