This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 87ce3ac [FLINK-22367][streaming] Reset syncSavepointId only if it is
equal to checkpoint id from event
87ce3ac is described below
commit 87ce3ac3c9a9c30eab0cb378575592e9a857665f
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Mon Aug 30 16:50:32 2021 +0200
[FLINK-22367][streaming] Reset syncSavepointId only if it is equal to
checkpoint id from event
This closes #17104
---
.../flink/streaming/runtime/tasks/StreamTask.java | 5 ++-
.../streaming/runtime/tasks/StreamTaskTest.java | 44 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 1 deletion(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1c30618..56e9cf4 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -430,7 +430,10 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>> extends Ab
activeSyncSavepointId = null;
operatorChain.setIgnoreEndOfInput(false);
}
- syncSavepointId = null;
+
+ if (syncSavepointId != null && syncSavepointId <= id) {
+ syncSavepointId = null;
+ }
}
private void setSynchronousSavepointId(long checkpointId, boolean
ignoreEndOfInput) {
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 7d2ff81..8f5b97b 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -62,6 +62,7 @@ import
org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.DoneFuture;
@@ -1251,6 +1252,49 @@ public class StreamTaskTest extends TestLogger {
}
@Test
+ public void testAbortPreviousCheckpointBeforeCompleteTerminateSavepoint()
throws Throwable {
+ // given: Marker that the task is finished.
+ AtomicBoolean finishTask = new AtomicBoolean();
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(
+ (env) ->
+ new OneInputStreamTask<Integer,
Integer>(env) {
+ @Override
+ protected void finishTask() throws
Exception {
+ super.finishTask();
+ finishTask.set(true);
+ }
+ },
+ BasicTypeInfo.INT_TYPE_INFO)
+ .addInput(BasicTypeInfo.INT_TYPE_INFO);
+ StreamTaskMailboxTestHarness<Integer> harness =
+ builder.setupOutputForSingletonOperatorChain(
+ new TestBoundedOneInputStreamOperator())
+ .build();
+
+ // when: Receiving the abort notification of the previous checkpoint
before the complete
+ // notification of the savepoint terminate.
+ MailboxExecutor executor =
+
harness.streamTask.getMailboxExecutorFactory().createExecutor(MAX_PRIORITY);
+ executor.execute(
+ () ->
+ harness.streamTask.triggerCheckpointOnBarrier(
+ new CheckpointMetaData(2, 0),
+ new CheckpointOptions(
+ CheckpointType.SAVEPOINT_TERMINATE,
+
CheckpointStorageLocationReference.getDefault()),
+ new CheckpointMetricsBuilder()),
+ "test");
+ harness.streamTask.notifyCheckpointAbortAsync(1);
+ harness.streamTask.notifyCheckpointCompleteAsync(2);
+
+ harness.processAll();
+
+ // then: The task should be finished.
+ assertEquals(true, finishTask.get());
+ }
+
+ @Test
public void
testExecuteMailboxActionsAfterLeavingInputProcessorMailboxLoop() throws
Exception {
OneShotLatch latch = new OneShotLatch();
try (MockEnvironment mockEnvironment = new
MockEnvironmentBuilder().build()) {