This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 89ceb2a59c5b8bb1e5f421e4c2568b7603fe7fb5
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Wed Mar 4 15:47:55 2020 +0100

    [FLINK-16316][task] Remove StreamTask dependency from 
AbstractStreamOperator#snapshotState
---
 .../api/operators/AbstractStreamOperator.java      |  3 --
 .../runtime/tasks/CheckpointingOperation.java      | 33 ++++++++++++++++++----
 .../flink/streaming/runtime/tasks/StreamTask.java  |  3 +-
 3 files changed, 30 insertions(+), 9 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 532c57f..2f8d0a1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -431,9 +431,6 @@ public abstract class AbstractStreamOperator<OUT>
                        String snapshotFailMessage = "Could not complete 
snapshot " + checkpointId + " for operator " +
                                getOperatorName() + ".";
 
-                       if (!getContainingTask().isCanceled()) {
-                               LOG.info(snapshotFailMessage, 
snapshotException);
-                       }
                        try {
                                snapshotContext.closeExceptionally();
                        } catch (IOException e) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointingOperation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointingOperation.java
index cb88ea0..81a6fd8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointingOperation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointingOperation.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 final class CheckpointingOperation {
 
@@ -48,7 +49,8 @@ final class CheckpointingOperation {
                        CloseableRegistry closeableRegistry,
                        ExecutorService threadPool,
                        Environment environment,
-                       AsyncExceptionHandler asyncExceptionHandler) throws 
Exception {
+                       AsyncExceptionHandler asyncExceptionHandler,
+                       Supplier<Boolean> isCanceled) throws Exception {
 
                Preconditions.checkNotNull(checkpointMetaData);
                Preconditions.checkNotNull(checkpointOptions);
@@ -66,11 +68,12 @@ final class CheckpointingOperation {
                try {
                        for (StreamOperatorWrapper<?, ?> operatorWrapper : 
operatorChain.getAllOperators(true)) {
                                StreamOperator<?> op = 
operatorWrapper.getStreamOperator();
-                               OperatorSnapshotFutures snapshotInProgress = 
op.snapshotState(
-                                       checkpointMetaData.getCheckpointId(),
-                                       checkpointMetaData.getTimestamp(),
+                               OperatorSnapshotFutures snapshotInProgress = 
checkpointStreamOperator(
+                                       op,
+                                       checkpointMetaData,
                                        checkpointOptions,
-                                       storageLocation);
+                                       storageLocation,
+                                       isCanceled);
                                
operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
                        }
 
@@ -133,4 +136,24 @@ final class CheckpointingOperation {
                }
        }
 
+       private static OperatorSnapshotFutures checkpointStreamOperator(
+                       StreamOperator<?> op,
+                       CheckpointMetaData checkpointMetaData,
+                       CheckpointOptions checkpointOptions,
+                       CheckpointStreamFactory storageLocation,
+                       Supplier<Boolean> isCanceled) throws Exception {
+               try {
+                       return op.snapshotState(
+                               checkpointMetaData.getCheckpointId(),
+                               checkpointMetaData.getTimestamp(),
+                               checkpointOptions,
+                               storageLocation);
+               }
+               catch (Exception ex) {
+                       if (!isCanceled.get()) {
+                               LOG.info(ex.getMessage(), ex);
+                       }
+                       throw ex;
+               }
+       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index dd7cd4d..f2972cb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -907,7 +907,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        getCancelables(),
                        getAsyncOperationsThreadPool(),
                        getEnvironment(),
-                       this);
+                       this,
+                       this::isCanceled);
        }
 
        // 
------------------------------------------------------------------------

Reply via email to