[ 
https://issues.apache.org/jira/browse/FLINK-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457997#comment-15457997
 ] 

ASF GitHub Bot commented on FLINK-4439:
---------------------------------------

Github user gheo21 commented on the issue:

    https://github.com/apache/flink/pull/2397
  
    Sure, did it. Let's see if it get's green! Thanks.


> Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid
> ----------------------------------------------------------------------
>
>                 Key: FLINK-4439
>                 URL: https://issues.apache.org/jira/browse/FLINK-4439
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming Connectors
>    Affects Versions: 1.0.3
>            Reporter: Gheorghe Gheorghe
>            Priority: Minor
>
> The "flink-connector-kafka-0.8_2"  is logging the following error when all 
> 'bootstrap.servers' are invalid when passed to the FlinkKafkaConsumer08. 
> See stacktrace: 
> {code:title=stacktrace|borderStyle=solid}
> 2016-08-21 15:22:30 WARN  FlinkKafkaConsumerBase:290 - Error communicating 
> with broker inexistentKafkHost:9092 to find partitions for [testTopic].class 
> java.nio.channels.ClosedChannelException. Message: null
> 2016-08-21 15:22:30 DEBUG FlinkKafkaConsumerBase:292 - Detailed trace
> java.nio.channels.ClosedChannelException
>       at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>       at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
>       at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>       at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
>       at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:264)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:193)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:164)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:131)
>       at MetricsFromKafka$.main(MetricsFromKafka.scala:38)
>       at MetricsFromKafka.main(MetricsFromKafka.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at sbt.Run.invokeMain(Run.scala:67)
>       at sbt.Run.run0(Run.scala:61)
>       at sbt.Run.sbt$Run$$execute$1(Run.scala:51)
>       at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55)
>       at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>       at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>       at sbt.Logger$$anon$4.apply(Logger.scala:84)
>       at sbt.TrapExit$App.run(TrapExit.scala:248)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> In the above stackrace it is hard to figure out that the actual servers 
> provided as a config cannot be resolved to a valid ip address. Moreover the 
> flink kafka consumer will try all of those servers one by one and failing to 
> get partition information.
> The suggested improvement is to fail fast and announce the user that the 
> servers provided in the 'boostrap.servers' config are invalid. If at least 
> one server is valid then the exception should not be thrown. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to