This is an automated email from the ASF dual-hosted git repository.
yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b00d158 [FLINK-23473][runtime] Do not create transaction in
TwoPhaseCommitSinkFunction after finish (#16768)
b00d158 is described below
commit b00d158e80ee9b5cb2287ed5b1b4d1baab3dc740
Author: Yuan Mei <[email protected]>
AuthorDate: Fri Aug 13 23:57:28 2021 +0800
[FLINK-23473][runtime] Do not create transaction in
TwoPhaseCommitSinkFunction after finish (#16768)
---
.../functions/sink/TwoPhaseCommitSinkFunction.java | 49 ++++++++++++++++++----
.../sink/TwoPhaseCommitSinkFunctionTest.java | 33 +++++++++++++++
2 files changed, 74 insertions(+), 8 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 1c06b0d..32f3904 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -90,6 +90,12 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN,
CONTEXT> extends RichS
private final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor;
+ /**
+ * Current Transaction Holder, including three states: 1. Normal
Transaction: created when a new
+ * snapshot is taken during normal task running 2. Empty Transaction:
created when a new
+ * snapshot is taken after the task is finished. At this point, there is
no need to initiate
+ * real transactions due to no more input data. 3. null: After
task/function is closed.
+ */
private TransactionHolder<TXN> currentTransactionHolder;
/** Specifies the maximum time a transaction should remain open. */
@@ -107,6 +113,9 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN,
CONTEXT> extends RichS
*/
private double transactionTimeoutWarningRatio = -1;
+ /** Whether this sink function as well as its task is finished. */
+ private boolean finished = false;
+
/**
* Use default {@link ListStateDescriptor} for internal state
serialization. Helpful utilities
* for using this constructor are {@link TypeInformation#of(Class)}, {@link
@@ -230,11 +239,16 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN,
CONTEXT> extends RichS
@Override
public final void invoke(IN value, Context context) throws Exception {
- invoke(currentTransactionHolder.handle, value, context);
+ TXN currentTransaction = currentTransaction();
+ checkNotNull(
+ currentTransaction,
+ "Two phase commit sink function with null transaction should
not be invoked! ");
+ invoke(currentTransaction, value, context);
}
@Override
public final void finish() throws Exception {
+ finished = true;
finishProcessing(currentTransaction());
}
@@ -332,11 +346,18 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN,
CONTEXT> extends RichS
context.getCheckpointId(),
currentTransactionHolder);
- preCommit(currentTransactionHolder.handle);
- pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
- LOG.debug("{} - stored pending transactions {}", name(),
pendingCommitTransactions);
+ if (!currentTransactionHolder.equals(TransactionHolder.empty())) {
+ preCommit(currentTransactionHolder.handle);
+ pendingCommitTransactions.put(checkpointId,
currentTransactionHolder);
+ LOG.debug("{} - stored pending transactions {}", name(),
pendingCommitTransactions);
+ }
- currentTransactionHolder = beginTransactionInternal();
+ // no need to start new transactions after sink function is closed (no
more input data)
+ if (!finished) {
+ currentTransactionHolder = beginTransactionInternal();
+ } else {
+ currentTransactionHolder = TransactionHolder.empty();
+ }
LOG.debug("{} - started new transaction '{}'", name(),
currentTransactionHolder);
state.clear();
@@ -383,6 +404,9 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN,
CONTEXT> extends RichS
{
TXN transaction =
operatorState.getPendingTransaction().handle;
+
+ checkNotNull(transaction, "Pending transaction is not
expected to be null");
+
recoverAndAbort(transaction);
handledTransactions.add(transaction);
LOG.info(
@@ -460,10 +484,12 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN,
CONTEXT> extends RichS
public void close() throws Exception {
super.close();
- if (currentTransactionHolder != null) {
- abort(currentTransactionHolder.handle);
- currentTransactionHolder = null;
+ TXN currentTransaction = currentTransaction();
+ if (currentTransaction != null) {
+ abort(currentTransaction);
}
+
+ currentTransactionHolder = null;
}
/**
@@ -607,6 +633,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN,
CONTEXT> extends RichS
private final TXN handle;
+ private static final TransactionHolder<?> EMPTY = new
TransactionHolder<>(null, -1);
+
/**
* The system time when {@link #handle} was created. Used to determine
if the current
* transaction has exceeded its timeout specified by {@link
#transactionTimeout}.
@@ -623,6 +651,11 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN,
CONTEXT> extends RichS
return clock.millis() - transactionStartTime;
}
+ @SuppressWarnings("unchecked")
+ public static <TXN> TransactionHolder<TXN> empty() {
+ return (TransactionHolder<TXN>) EMPTY;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 898c08b..4dd6cb6 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -117,6 +117,39 @@ public class TwoPhaseCommitSinkFunctionTest {
}
@Test
+ public void testNoTransactionAfterSinkFunctionFinish() throws Exception {
+ harness.open();
+ harness.processElement("42", 0);
+ harness.snapshot(0, 1);
+ harness.processElement("43", 2);
+ harness.snapshot(1, 3);
+ harness.processElement("44", 4);
+
+ // do not expect new input after finish()
+ sinkFunction.finish();
+
+ harness.snapshot(2, 5);
+ harness.notifyOfCompletedCheckpoint(1);
+
+ // make sure the previous empty transaction will not be pre-committed
+ harness.snapshot(3, 6);
+
+ try {
+ harness.processElement("45", 7);
+ fail(
+ "TwoPhaseCommitSinkFunctionTest should not process any
more input data after finish!");
+ } catch (NullPointerException e) {
+ // expected and do nothing here
+ }
+
+ // Checkpoint2 has not complete
+ assertExactlyOnce(Arrays.asList("42", "43"));
+
+ // transaction for checkpoint2
+ assertEquals(1, tmpDirectory.listFiles().size());
+ }
+
+ @Test
public void testNotifyOfCompletedCheckpoint() throws Exception {
harness.open();
harness.processElement("42", 0);