This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0119ac060bca224b2ae9ad9934ab38b489517b1d
Author: Yun Gao <[email protected]>
AuthorDate: Tue Mar 2 14:57:39 2021 +0800

    [FLINK-21085] Allows taking snapshot with closed operators if enabled 
checkpoints after tasks finished
    
    Since the StreamTask would waiting till the downstream tasks have finished
    processing all the pending records, during this period the checkpoint
    has to be taken with closed operators. In the future PR we would
    deal with this case by also reporting the status of operator finished.
    In this PR we would first not decline checkpoint in this scenario
    so that we could add tests.
---
 .../runtime/tasks/SubtaskCheckpointCoordinator.java         |  6 ++++++
 .../runtime/tasks/SubtaskCheckpointCoordinatorImpl.java     | 13 ++++++++++++-
 .../runtime/tasks/TestSubtaskCheckpointCoordinator.java     |  3 +++
 3 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
index 99f2132..6374bcc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
@@ -43,6 +43,12 @@ import java.util.function.Supplier;
 @Internal
 public interface SubtaskCheckpointCoordinator extends Closeable {
 
+    /**
+     * TODO Whether enables checkpoints after tasks finished. This is a 
temporary flag and will be
+     * removed in the last PR.
+     */
+    void setEnableCheckpointAfterTasksFinished(boolean 
enableCheckpointAfterTasksFinished);
+
     /** Initialize new checkpoint. */
     void initInputsCheckpoint(long id, CheckpointOptions checkpointOptions)
             throws CheckpointException;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index cf047ae..3bd810c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -77,6 +77,12 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
 
     private static final int CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS = 
30_000;
 
+    /**
+     * TODO Whether enables checkpoints after tasks finished. This is a 
temporary flag and will be
+     * removed in the last PR.
+     */
+    private boolean enableCheckpointAfterTasksFinished;
+
     private final CachingCheckpointStorageWorkerView checkpointStorage;
     private final String taskName;
     private final ExecutorService asyncOperationsThreadPool;
@@ -200,6 +206,11 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
     }
 
     @Override
+    public void setEnableCheckpointAfterTasksFinished(boolean 
enableCheckpointAfterTasksFinished) {
+        this.enableCheckpointAfterTasksFinished = 
enableCheckpointAfterTasksFinished;
+    }
+
+    @Override
     public void abortCheckpointOnBarrier(
             long checkpointId, CheckpointException cause, OperatorChain<?, ?> 
operatorChain)
             throws IOException {
@@ -559,7 +570,7 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
 
         for (final StreamOperatorWrapper<?, ?> operatorWrapper :
                 operatorChain.getAllOperators(true)) {
-            if (operatorWrapper.isClosed()) {
+            if (!enableCheckpointAfterTasksFinished && 
operatorWrapper.isClosed()) {
                 env.declineCheckpoint(
                         checkpointMetaData.getCheckpointId(),
                         new CheckpointException(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
index 4bb2443..d67020e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
@@ -53,6 +53,9 @@ public class TestSubtaskCheckpointCoordinator implements 
SubtaskCheckpointCoordi
     }
 
     @Override
+    public void setEnableCheckpointAfterTasksFinished(boolean 
enableCheckpointAfterTasksFinished) {}
+
+    @Override
     public void initInputsCheckpoint(long id, CheckpointOptions 
checkpointOptions) {
         channelStateWriter.start(id, checkpointOptions);
     }

Reply via email to