This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ad0d5c8 [FLINK-13535][connector/Kafka] do not abort transactions
twice during KafkaProducer startup
ad0d5c8 is described below
commit ad0d5c8e256e6db5f6a51e6374cdc262283c912d
Author: Nico Kruber <[email protected]>
AuthorDate: Thu Aug 1 16:22:32 2019 +0200
[FLINK-13535][connector/Kafka] do not abort transactions twice during
KafkaProducer startup
During startup of a transactional Kafka producer from previous state, we
recover
in two steps:
1) in TwoPhaseCommitSinkFunction, we commit pending commit-transactions and
abort pending transactions and then call into finishRecoveringContext()
2) in FlinkKafkaProducer#finishRecoveringContext() we iterate over all
recovered
transaction IDs and abort them.
This may lead to some transactions being worked on twice. To avoid the
overhead
associated with it (creating a KafkaProducer for each of these transactions
is
expensive), we provide finishRecoveringContext() with a collection of all
transactions that TwoPhaseCommitSinkFunction already covered and that do not
need to be aborted again. The FlinkKafkaProducer instances then ignore
transactional IDs from that set.
---
.../connectors/kafka/FlinkKafkaProducer011.java | 14 ++++++++++----
.../connectors/kafka/FlinkKafkaProducer.java | 14 ++++++++++----
.../functions/sink/TwoPhaseCommitSinkFunction.java | 21 +++++++++++++++++----
3 files changed, 37 insertions(+), 12 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 462262b..7bc847f 100644
---
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -882,20 +882,26 @@ public class FlinkKafkaProducer011<IN>
}
@Override
- protected void finishRecoveringContext() {
- cleanUpUserContext();
+ protected void
finishRecoveringContext(Collection<FlinkKafkaProducer011.KafkaTransactionState>
handledTransactions) {
+ cleanUpUserContext(handledTransactions);
resetAvailableTransactionalIdsPool(getUserContext().get().transactionalIds);
LOG.info("Recovered transactionalIds {}",
getUserContext().get().transactionalIds);
}
/**
* After initialization make sure that all previous transactions from
the current user context have been completed.
+ *
+ * @param handledTransactions
+ * transactions which were already committed or aborted
and do not need further handling
*/
- private void cleanUpUserContext() {
+ private void cleanUpUserContext(Collection<KafkaTransactionState>
handledTransactions) {
if (!getUserContext().isPresent()) {
return;
}
- abortTransactions(getUserContext().get().transactionalIds);
+ HashSet<String> abortTransactions = new
HashSet<>(getUserContext().get().transactionalIds);
+ handledTransactions.forEach(
+ kafkaTransactionState ->
abortTransactions.remove(kafkaTransactionState.transactionalId));
+ abortTransactions(abortTransactions);
}
private void resetAvailableTransactionalIdsPool(Collection<String>
transactionalIds) {
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 6314b58..605ee3f 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -1057,8 +1057,8 @@ public class FlinkKafkaProducer<IN>
}
@Override
- protected void finishRecoveringContext() {
- cleanUpUserContext();
+ protected void
finishRecoveringContext(Collection<FlinkKafkaProducer.KafkaTransactionState>
handledTransactions) {
+ cleanUpUserContext(handledTransactions);
resetAvailableTransactionalIdsPool(getUserContext().get().transactionalIds);
LOG.info("Recovered transactionalIds {}",
getUserContext().get().transactionalIds);
}
@@ -1069,12 +1069,18 @@ public class FlinkKafkaProducer<IN>
/**
* After initialization make sure that all previous transactions from
the current user context have been completed.
+ *
+ * @param handledTransactions
+ * transactions which were already committed or aborted
and do not need further handling
*/
- private void cleanUpUserContext() {
+ private void
cleanUpUserContext(Collection<FlinkKafkaProducer.KafkaTransactionState>
handledTransactions) {
if (!getUserContext().isPresent()) {
return;
}
- abortTransactions(getUserContext().get().transactionalIds);
+ HashSet<String> abortTransactions = new
HashSet<>(getUserContext().get().transactionalIds);
+ handledTransactions.forEach(
+ kafkaTransactionState ->
abortTransactions.remove(kafkaTransactionState.transactionalId));
+ abortTransactions(abortTransactions);
}
private void resetAvailableTransactionalIdsPool(Collection<String>
transactionalIds) {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 588592e..025f867 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -49,6 +49,7 @@ import java.io.IOException;
import java.time.Clock;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -210,7 +211,13 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN,
CONTEXT>
abort(transaction);
}
- protected void finishRecoveringContext() {
+ /**
+ * Callback for subclasses which is called after restoring (each) user
context.
+ *
+ * @param handledTransactions
+ * transactions which were already committed or aborted
and do not need further handling
+ */
+ protected void finishRecoveringContext(Collection<TXN>
handledTransactions) {
}
// ------ entry points for above methods implementing
{@CheckPointedFunction} and {@CheckpointListener} ------
@@ -346,17 +353,23 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN,
CONTEXT>
for (State<TXN, CONTEXT> operatorState : state.get()) {
userContext = operatorState.getContext();
List<TransactionHolder<TXN>>
recoveredTransactions = operatorState.getPendingCommitTransactions();
+ List<TXN> handledTransactions = new
ArrayList<>(recoveredTransactions.size() + 1);
for (TransactionHolder<TXN>
recoveredTransaction : recoveredTransactions) {
// If this fails to succeed eventually,
there is actually data loss
recoverAndCommitInternal(recoveredTransaction);
+
handledTransactions.add(recoveredTransaction.handle);
LOG.info("{} committed recovered
transaction {}", name(), recoveredTransaction);
}
-
recoverAndAbort(operatorState.getPendingTransaction().handle);
- LOG.info("{} aborted recovered transaction {}",
name(), operatorState.getPendingTransaction());
+ {
+ TXN transaction =
operatorState.getPendingTransaction().handle;
+ recoverAndAbort(transaction);
+ handledTransactions.add(transaction);
+ LOG.info("{} aborted recovered
transaction {}", name(), operatorState.getPendingTransaction());
+ }
if (userContext.isPresent()) {
- finishRecoveringContext();
+
finishRecoveringContext(handledTransactions);
recoveredUserContext = true;
}
}