This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0119ac060bca224b2ae9ad9934ab38b489517b1d Author: Yun Gao <[email protected]> AuthorDate: Tue Mar 2 14:57:39 2021 +0800 [FLINK-21085] Allows taking snapshot with closed operators if enabled checkpoints after tasks finished Since the StreamTask would waiting till the downstream tasks have finished processing all the pending records, during this period the checkpoint has to be taken with closed operators. In the future PR we would deal with this case by also reporting the status of operator finished. In this PR we would first not decline checkpoint in this scenario so that we could add tests. --- .../runtime/tasks/SubtaskCheckpointCoordinator.java | 6 ++++++ .../runtime/tasks/SubtaskCheckpointCoordinatorImpl.java | 13 ++++++++++++- .../runtime/tasks/TestSubtaskCheckpointCoordinator.java | 3 +++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java index 99f2132..6374bcc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java @@ -43,6 +43,12 @@ import java.util.function.Supplier; @Internal public interface SubtaskCheckpointCoordinator extends Closeable { + /** + * TODO Whether enables checkpoints after tasks finished. This is a temporary flag and will be + * removed in the last PR. + */ + void setEnableCheckpointAfterTasksFinished(boolean enableCheckpointAfterTasksFinished); + /** Initialize new checkpoint. */ void initInputsCheckpoint(long id, CheckpointOptions checkpointOptions) throws CheckpointException; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index cf047ae..3bd810c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -77,6 +77,12 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { private static final int CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS = 30_000; + /** + * TODO Whether enables checkpoints after tasks finished. This is a temporary flag and will be + * removed in the last PR. + */ + private boolean enableCheckpointAfterTasksFinished; + private final CachingCheckpointStorageWorkerView checkpointStorage; private final String taskName; private final ExecutorService asyncOperationsThreadPool; @@ -200,6 +206,11 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { } @Override + public void setEnableCheckpointAfterTasksFinished(boolean enableCheckpointAfterTasksFinished) { + this.enableCheckpointAfterTasksFinished = enableCheckpointAfterTasksFinished; + } + + @Override public void abortCheckpointOnBarrier( long checkpointId, CheckpointException cause, OperatorChain<?, ?> operatorChain) throws IOException { @@ -559,7 +570,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { for (final StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) { - if (operatorWrapper.isClosed()) { + if (!enableCheckpointAfterTasksFinished && operatorWrapper.isClosed()) { env.declineCheckpoint( checkpointMetaData.getCheckpointId(), new CheckpointException( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java index 4bb2443..d67020e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java @@ -53,6 +53,9 @@ public class TestSubtaskCheckpointCoordinator implements SubtaskCheckpointCoordi } @Override + public void setEnableCheckpointAfterTasksFinished(boolean enableCheckpointAfterTasksFinished) {} + + @Override public void initInputsCheckpoint(long id, CheckpointOptions checkpointOptions) { channelStateWriter.start(id, checkpointOptions); }
