[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15314042#comment-15314042
 ] 

Sebastian Klemke commented on FLINK-4015:
-----------------------------------------

We use default retries: 0. Also, we can't set retries to a higher value, bc 
kafka documentation says "Allowing retries will potentially change the ordering 
of records because if two records are sent to a single partition, and the first 
fails and is retried but the second succeeds, then the second record may appear 
first." For our application, this must not happen. Repeating whole batches 
in-order would be okay.

> FlinkKafkaProducer08 fails when partition leader changes
> --------------------------------------------------------
>
>                 Key: FLINK-4015
>                 URL: https://issues.apache.org/jira/browse/FLINK-4015
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.0.2
>            Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager                       
>    - Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>       at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>       at OPERATOR.flatMap2(OPERATOR.java:82)
>       at OPERATOR.flatMap2(OPERATOR.java:16)
>       at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>       at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>       at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>       ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>       at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>       ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>       ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to