[ 
https://issues.apache.org/jira/browse/FLINK-8086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16259003#comment-16259003
 ] 

ASF GitHub Bot commented on FLINK-8086:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5030#discussion_r151938425
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
    @@ -720,11 +719,10 @@ protected void abort(KafkaTransactionState 
transaction) {
        protected void recoverAndAbort(KafkaTransactionState transaction) {
                switch (semantic) {
                        case EXACTLY_ONCE:
    -                           FlinkKafkaProducer<byte[], byte[]> producer =
    -                                   
initTransactionalProducer(transaction.transactionalId, false);
    -                           
producer.resumeTransaction(transaction.producerId, transaction.epoch);
    -                           producer.abortTransaction();
    -                           producer.close();
    +                           try (FlinkKafkaProducer<byte[], byte[]> 
producer =
    +                                           
initTransactionalProducer(transaction.transactionalId, false)) {
    +                                   producer.initTransactions();
    --- End diff --
    
    why is this changed from
    ```
    resumeTransaction()
    abortTransaction()
    ```
    to
    ```
    initTransaction()
    ```


> FlinkKafkaProducer011 can permanently fail in recovery through 
> ProducerFencedException
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-8086
>                 URL: https://issues.apache.org/jira/browse/FLINK-8086
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.0, 1.5.0
>            Reporter: Stefan Richter
>            Assignee: Piotr Nowojski
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> Chaos monkey test in a cluster environment can permanently bring down our 
> FlinkKafkaProducer011.
> Typically, after a small number of randomly killed TMs, the data generator 
> job is no longer able to recover from a checkpoint because of the following 
> problem:
> org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an 
> operation with an old epoch. Either there is a newer producer with the same 
> transactionalId, or the producer's transaction has been expired by the broker.
> The problem is reproduceable and happened for me in every run after the chaos 
> monkey killed a couple of TMs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to