This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ad0d5c8  [FLINK-13535][connector/Kafka] do not abort transactions 
twice during KafkaProducer startup
ad0d5c8 is described below

commit ad0d5c8e256e6db5f6a51e6374cdc262283c912d
Author: Nico Kruber <[email protected]>
AuthorDate: Thu Aug 1 16:22:32 2019 +0200

    [FLINK-13535][connector/Kafka] do not abort transactions twice during 
KafkaProducer startup
    
    During startup of a transactional Kafka producer from previous state, we 
recover
    in two steps:
    1) in TwoPhaseCommitSinkFunction, we commit pending commit-transactions and
       abort pending transactions and then call into finishRecoveringContext()
    2) in FlinkKafkaProducer#finishRecoveringContext() we iterate over all 
recovered
       transaction IDs and abort them.
    
    This may lead to some transactions being worked on twice. To avoid the 
overhead
    associated with it (creating a KafkaProducer for each of these transactions 
is
    expensive), we provide finishRecoveringContext() with a collection of all
    transactions that TwoPhaseCommitSinkFunction already covered and that do not
    need to be aborted again. The FlinkKafkaProducer instances then ignore
    transactional IDs from that set.
---
 .../connectors/kafka/FlinkKafkaProducer011.java     | 14 ++++++++++----
 .../connectors/kafka/FlinkKafkaProducer.java        | 14 ++++++++++----
 .../functions/sink/TwoPhaseCommitSinkFunction.java  | 21 +++++++++++++++++----
 3 files changed, 37 insertions(+), 12 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 462262b..7bc847f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -882,20 +882,26 @@ public class FlinkKafkaProducer011<IN>
        }
 
        @Override
-       protected void finishRecoveringContext() {
-               cleanUpUserContext();
+       protected void 
finishRecoveringContext(Collection<FlinkKafkaProducer011.KafkaTransactionState> 
handledTransactions) {
+               cleanUpUserContext(handledTransactions);
                
resetAvailableTransactionalIdsPool(getUserContext().get().transactionalIds);
                LOG.info("Recovered transactionalIds {}", 
getUserContext().get().transactionalIds);
        }
 
        /**
         * After initialization make sure that all previous transactions from 
the current user context have been completed.
+        *
+        * @param handledTransactions
+        *              transactions which were already committed or aborted 
and do not need further handling
         */
-       private void cleanUpUserContext() {
+       private void cleanUpUserContext(Collection<KafkaTransactionState> 
handledTransactions) {
                if (!getUserContext().isPresent()) {
                        return;
                }
-               abortTransactions(getUserContext().get().transactionalIds);
+               HashSet<String> abortTransactions = new 
HashSet<>(getUserContext().get().transactionalIds);
+               handledTransactions.forEach(
+                       kafkaTransactionState -> 
abortTransactions.remove(kafkaTransactionState.transactionalId));
+               abortTransactions(abortTransactions);
        }
 
        private void resetAvailableTransactionalIdsPool(Collection<String> 
transactionalIds) {
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 6314b58..605ee3f 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -1057,8 +1057,8 @@ public class FlinkKafkaProducer<IN>
        }
 
        @Override
-       protected void finishRecoveringContext() {
-               cleanUpUserContext();
+       protected void 
finishRecoveringContext(Collection<FlinkKafkaProducer.KafkaTransactionState> 
handledTransactions) {
+               cleanUpUserContext(handledTransactions);
                
resetAvailableTransactionalIdsPool(getUserContext().get().transactionalIds);
                LOG.info("Recovered transactionalIds {}", 
getUserContext().get().transactionalIds);
        }
@@ -1069,12 +1069,18 @@ public class FlinkKafkaProducer<IN>
 
        /**
         * After initialization make sure that all previous transactions from 
the current user context have been completed.
+        *
+        * @param handledTransactions
+        *              transactions which were already committed or aborted 
and do not need further handling
         */
-       private void cleanUpUserContext() {
+       private void 
cleanUpUserContext(Collection<FlinkKafkaProducer.KafkaTransactionState> 
handledTransactions) {
                if (!getUserContext().isPresent()) {
                        return;
                }
-               abortTransactions(getUserContext().get().transactionalIds);
+               HashSet<String> abortTransactions = new 
HashSet<>(getUserContext().get().transactionalIds);
+               handledTransactions.forEach(
+                       kafkaTransactionState -> 
abortTransactions.remove(kafkaTransactionState.transactionalId));
+               abortTransactions(abortTransactions);
        }
 
        private void resetAvailableTransactionalIdsPool(Collection<String> 
transactionalIds) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 588592e..025f867 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -49,6 +49,7 @@ import java.io.IOException;
 import java.time.Clock;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -210,7 +211,13 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
                abort(transaction);
        }
 
-       protected void finishRecoveringContext() {
+       /**
+        * Callback for subclasses which is called after restoring (each) user 
context.
+        *
+        * @param handledTransactions
+        *              transactions which were already committed or aborted 
and do not need further handling
+        */
+       protected void finishRecoveringContext(Collection<TXN> 
handledTransactions) {
        }
 
        // ------ entry points for above methods implementing 
{@CheckPointedFunction} and {@CheckpointListener} ------
@@ -346,17 +353,23 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
                        for (State<TXN, CONTEXT> operatorState : state.get()) {
                                userContext = operatorState.getContext();
                                List<TransactionHolder<TXN>> 
recoveredTransactions = operatorState.getPendingCommitTransactions();
+                               List<TXN> handledTransactions = new 
ArrayList<>(recoveredTransactions.size() + 1);
                                for (TransactionHolder<TXN> 
recoveredTransaction : recoveredTransactions) {
                                        // If this fails to succeed eventually, 
there is actually data loss
                                        
recoverAndCommitInternal(recoveredTransaction);
+                                       
handledTransactions.add(recoveredTransaction.handle);
                                        LOG.info("{} committed recovered 
transaction {}", name(), recoveredTransaction);
                                }
 
-                               
recoverAndAbort(operatorState.getPendingTransaction().handle);
-                               LOG.info("{} aborted recovered transaction {}", 
name(), operatorState.getPendingTransaction());
+                               {
+                                       TXN transaction = 
operatorState.getPendingTransaction().handle;
+                                       recoverAndAbort(transaction);
+                                       handledTransactions.add(transaction);
+                                       LOG.info("{} aborted recovered 
transaction {}", name(), operatorState.getPendingTransaction());
+                               }
 
                                if (userContext.isPresent()) {
-                                       finishRecoveringContext();
+                                       
finishRecoveringContext(handledTransactions);
                                        recoveredUserContext = true;
                                }
                        }

Reply via email to