Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5030#discussion_r151940978
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
    @@ -720,11 +719,10 @@ protected void abort(KafkaTransactionState 
transaction) {
        protected void recoverAndAbort(KafkaTransactionState transaction) {
                switch (semantic) {
                        case EXACTLY_ONCE:
    -                           FlinkKafkaProducer<byte[], byte[]> producer =
    -                                   
initTransactionalProducer(transaction.transactionalId, false);
    -                           
producer.resumeTransaction(transaction.producerId, transaction.epoch);
    -                           producer.abortTransaction();
    -                           producer.close();
    +                           try (FlinkKafkaProducer<byte[], byte[]> 
producer =
    +                                           
initTransactionalProducer(transaction.transactionalId, false)) {
    +                                   producer.initTransactions();
    --- End diff --
    
    `initTransaction()` does the same thing as resuming and aborting previous 
transactions, but in a safer way, since `resumeTransaction()` is part of our 
`KafkaProducer`s extension.


---

Reply via email to