[ 
https://issues.apache.org/jira/browse/FLINK-8895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393973#comment-16393973
 ] 

godfrey johnson commented on FLINK-8895:
----------------------------------------

[~StephanEwen] After the partition leader is changed, the consumer can get all 
the recorders by connecting to the other brokers. So, it is possible to avoid 
the flink job failure by filtering the failed broker, right?

> Job failed when one kafka broker shutdown
> -----------------------------------------
>
>                 Key: FLINK-8895
>                 URL: https://issues.apache.org/jira/browse/FLINK-8895
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.2.1
>            Reporter: godfrey johnson
>            Priority: Major
>
> I used a  FlinkKafkaConsumer08 to get records from kafka,but job failed when 
> a broker shutdown.
>  
> I want to know it is possible to filter the failed broker and get the records 
> with the others brokers?which need to modify Flink's source code.
>  
> And I get the following error:
> {code:java}
> // code placeholder
> java.net.SocketTimeoutException at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) at 
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) at 
> kafka.utils.Utils$.read(Utils.scala:380) at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>  at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>  at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) at 
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>  at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>  at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>  at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>  at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>  at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at 
> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at 
> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) at 
> org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:220)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to