[ 
https://issues.apache.org/jira/browse/FLINK-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15429744#comment-15429744
 ] 

ASF GitHub Bot commented on FLINK-4439:
---------------------------------------

GitHub user gheo21 opened a pull request:

    https://github.com/apache/flink/pull/2397

    [FLINK-4439] Validate 'bootstrap.servers' config in flink kafka consu…

    Hello everybody, 
    
    I would like to contribute a small improvement to Flink. 
    Lately, I was using the FlinkKafkaConsumer08 to write a streaming topology 
in flink. Somehow I mistakenly configured the 'boostrap.servers' for the kafka 
config with invalid hosts. 
    
    The message that flink provided was not clearly stating what the problem 
was. Hence, my improvement consists of a validation of the servers provided in 
'boostrap.servers'. 
    
    If none of the configured servers are valid then we should fail-fast and a 
validation exception should be thrown. If at lease one server is valid then we 
don't throw any exception.
    
    See for more info: https://issues.apache.org/jira/browse/FLINK-4439

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gheo21/flink 
flink-4439-kafka-consumer-conf-validation

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2397.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2397
    
----
commit 81bfe72f0de2e21941143cd92c3d031962e6bdc7
Author: George <[email protected]>
Date:   2016-08-21T14:08:02Z

    [FLINK-4439] Validate 'bootstrap.servers' config in flink kafka consumer 0.8

----


> Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid
> ----------------------------------------------------------------------
>
>                 Key: FLINK-4439
>                 URL: https://issues.apache.org/jira/browse/FLINK-4439
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming Connectors
>    Affects Versions: 1.0.3
>            Reporter: Gheorghe Gheorghe
>            Priority: Minor
>
> The "flink-connector-kafka-0.8_2"  is logging the following error when all 
> 'bootstrap.servers' are invalid when passed to the FlinkKafkaConsumer08. 
> See stacktrace: 
> {code:title=stacktrace|borderStyle=solid}
> 2016-08-21 15:22:30 WARN  FlinkKafkaConsumerBase:290 - Error communicating 
> with broker inexistentKafkHost:9092 to find partitions for [testTopic].class 
> java.nio.channels.ClosedChannelException. Message: null
> 2016-08-21 15:22:30 DEBUG FlinkKafkaConsumerBase:292 - Detailed trace
> java.nio.channels.ClosedChannelException
>       at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>       at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
>       at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>       at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
>       at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:264)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:193)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:164)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:131)
>       at MetricsFromKafka$.main(MetricsFromKafka.scala:38)
>       at MetricsFromKafka.main(MetricsFromKafka.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at sbt.Run.invokeMain(Run.scala:67)
>       at sbt.Run.run0(Run.scala:61)
>       at sbt.Run.sbt$Run$$execute$1(Run.scala:51)
>       at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55)
>       at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>       at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>       at sbt.Logger$$anon$4.apply(Logger.scala:84)
>       at sbt.TrapExit$App.run(TrapExit.scala:248)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> In the above stackrace it is hard to figure out that the actual servers 
> provided as a config cannot be resolved to a valid ip address. Moreover the 
> flink kafka consumer will try all of those servers one by one and failing to 
> get partition information.
> The suggested improvement is to fail fast and announce the user that the 
> servers provided in the 'boostrap.servers' config are invalid. If at least 
> one server is valid then the exception should not be thrown. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to