[ 
https://issues.apache.org/jira/browse/FLINK-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15429852#comment-15429852
 ] 

Gheorghe Gheorghe commented on FLINK-4439:
------------------------------------------

Hi Robert, 

thanks for taking a look. 
I agree with you there is logging and it looks good, but it doesn't point to 
the fact that my DNS cannot resolve the host name that I configured in the 
'boostrap.servers'.

My suggestion is to improve on that message to tell explicitly that the user 
misconfigured the boostrap server list. Because how it is right now, you retry 
and retry but your still get 'ClosedChannelException. Message: null' without 
any clue why.

This exception will not fail fast if your broker is unreachable because of 
let's say the kafka broker service being stopped or being temporarily 
unavailable. It would only fail if none of the configured hosts are resolvable 
by DNS, which I think it's an important situation which should raised to the 
user.

The user would get immediate feedback that none of the configured brokers have 
a valid server configured. (if at least one is available nothing will be thrown)

What do you think?

> Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid
> ----------------------------------------------------------------------
>
>                 Key: FLINK-4439
>                 URL: https://issues.apache.org/jira/browse/FLINK-4439
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming Connectors
>    Affects Versions: 1.0.3
>            Reporter: Gheorghe Gheorghe
>            Priority: Minor
>
> The "flink-connector-kafka-0.8_2"  is logging the following error when all 
> 'bootstrap.servers' are invalid when passed to the FlinkKafkaConsumer08. 
> See stacktrace: 
> {code:title=stacktrace|borderStyle=solid}
> 2016-08-21 15:22:30 WARN  FlinkKafkaConsumerBase:290 - Error communicating 
> with broker inexistentKafkHost:9092 to find partitions for [testTopic].class 
> java.nio.channels.ClosedChannelException. Message: null
> 2016-08-21 15:22:30 DEBUG FlinkKafkaConsumerBase:292 - Detailed trace
> java.nio.channels.ClosedChannelException
>       at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>       at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
>       at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>       at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
>       at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:264)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:193)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:164)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:131)
>       at MetricsFromKafka$.main(MetricsFromKafka.scala:38)
>       at MetricsFromKafka.main(MetricsFromKafka.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at sbt.Run.invokeMain(Run.scala:67)
>       at sbt.Run.run0(Run.scala:61)
>       at sbt.Run.sbt$Run$$execute$1(Run.scala:51)
>       at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55)
>       at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>       at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>       at sbt.Logger$$anon$4.apply(Logger.scala:84)
>       at sbt.TrapExit$App.run(TrapExit.scala:248)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> In the above stackrace it is hard to figure out that the actual servers 
> provided as a config cannot be resolved to a valid ip address. Moreover the 
> flink kafka consumer will try all of those servers one by one and failing to 
> get partition information.
> The suggested improvement is to fail fast and announce the user that the 
> servers provided in the 'boostrap.servers' config are invalid. If at least 
> one server is valid then the exception should not be thrown. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to