This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 87ce3ac  [FLINK-22367][streaming] Reset syncSavepointId only if it is 
equal to checkpoint id from event
87ce3ac is described below

commit 87ce3ac3c9a9c30eab0cb378575592e9a857665f
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Mon Aug 30 16:50:32 2021 +0200

    [FLINK-22367][streaming] Reset syncSavepointId only if it is equal to 
checkpoint id from event
    
    This closes #17104
---
 .../flink/streaming/runtime/tasks/StreamTask.java  |  5 ++-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 44 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1c30618..56e9cf4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -430,7 +430,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
             activeSyncSavepointId = null;
             operatorChain.setIgnoreEndOfInput(false);
         }
-        syncSavepointId = null;
+
+        if (syncSavepointId != null && syncSavepointId <= id) {
+            syncSavepointId = null;
+        }
     }
 
     private void setSynchronousSavepointId(long checkpointId, boolean 
ignoreEndOfInput) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 7d2ff81..8f5b97b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -62,6 +62,7 @@ import 
org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.DoneFuture;
@@ -1251,6 +1252,49 @@ public class StreamTaskTest extends TestLogger {
     }
 
     @Test
+    public void testAbortPreviousCheckpointBeforeCompleteTerminateSavepoint() 
throws Throwable {
+        // given: Marker that the task is finished.
+        AtomicBoolean finishTask = new AtomicBoolean();
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                (env) ->
+                                        new OneInputStreamTask<Integer, 
Integer>(env) {
+                                            @Override
+                                            protected void finishTask() throws 
Exception {
+                                                super.finishTask();
+                                                finishTask.set(true);
+                                            }
+                                        },
+                                BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+        StreamTaskMailboxTestHarness<Integer> harness =
+                builder.setupOutputForSingletonOperatorChain(
+                                new TestBoundedOneInputStreamOperator())
+                        .build();
+
+        // when: Receiving the abort notification of the previous checkpoint 
before the complete
+        // notification of the savepoint terminate.
+        MailboxExecutor executor =
+                
harness.streamTask.getMailboxExecutorFactory().createExecutor(MAX_PRIORITY);
+        executor.execute(
+                () ->
+                        harness.streamTask.triggerCheckpointOnBarrier(
+                                new CheckpointMetaData(2, 0),
+                                new CheckpointOptions(
+                                        CheckpointType.SAVEPOINT_TERMINATE,
+                                        
CheckpointStorageLocationReference.getDefault()),
+                                new CheckpointMetricsBuilder()),
+                "test");
+        harness.streamTask.notifyCheckpointAbortAsync(1);
+        harness.streamTask.notifyCheckpointCompleteAsync(2);
+
+        harness.processAll();
+
+        // then: The task should be finished.
+        assertEquals(true, finishTask.get());
+    }
+
+    @Test
     public void 
testExecuteMailboxActionsAfterLeavingInputProcessorMailboxLoop() throws 
Exception {
         OneShotLatch latch = new OneShotLatch();
         try (MockEnvironment mockEnvironment = new 
MockEnvironmentBuilder().build()) {

Reply via email to