[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372606#comment-15372606
 ] 

Robert Metzger commented on FLINK-4015:
---------------------------------------

What's the throughput of the Kafka producer? Do you think the synchronous 
producer would be fast enough?
I'm asking because the synchronous variant is probably easier to implement and 
also to operate. With the buffering Kafka producer, you need to maintain all 
the unconfirmed records in memory, so you may run into garbage collection 
issues.

> FlinkKafkaProducer08 fails when partition leader changes
> --------------------------------------------------------
>
>                 Key: FLINK-4015
>                 URL: https://issues.apache.org/jira/browse/FLINK-4015
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.0.2
>            Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager                       
>    - Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>       at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>       at OPERATOR.flatMap2(OPERATOR.java:82)
>       at OPERATOR.flatMap2(OPERATOR.java:16)
>       at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>       at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>       at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>       ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>       at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>       ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>       ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to