This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 89ceb2a59c5b8bb1e5f421e4c2568b7603fe7fb5 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Wed Mar 4 15:47:55 2020 +0100 [FLINK-16316][task] Remove StreamTask dependency from AbstractStreamOperator#snapshotState --- .../api/operators/AbstractStreamOperator.java | 3 -- .../runtime/tasks/CheckpointingOperation.java | 33 ++++++++++++++++++---- .../flink/streaming/runtime/tasks/StreamTask.java | 3 +- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 532c57f..2f8d0a1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -431,9 +431,6 @@ public abstract class AbstractStreamOperator<OUT> String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " + getOperatorName() + "."; - if (!getContainingTask().isCanceled()) { - LOG.info(snapshotFailMessage, snapshotException); - } try { snapshotContext.closeExceptionally(); } catch (IOException e) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointingOperation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointingOperation.java index cb88ea0..81a6fd8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointingOperation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointingOperation.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; final class CheckpointingOperation { @@ -48,7 +49,8 @@ final class CheckpointingOperation { CloseableRegistry closeableRegistry, ExecutorService threadPool, Environment environment, - AsyncExceptionHandler asyncExceptionHandler) throws Exception { + AsyncExceptionHandler asyncExceptionHandler, + Supplier<Boolean> isCanceled) throws Exception { Preconditions.checkNotNull(checkpointMetaData); Preconditions.checkNotNull(checkpointOptions); @@ -66,11 +68,12 @@ final class CheckpointingOperation { try { for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) { StreamOperator<?> op = operatorWrapper.getStreamOperator(); - OperatorSnapshotFutures snapshotInProgress = op.snapshotState( - checkpointMetaData.getCheckpointId(), - checkpointMetaData.getTimestamp(), + OperatorSnapshotFutures snapshotInProgress = checkpointStreamOperator( + op, + checkpointMetaData, checkpointOptions, - storageLocation); + storageLocation, + isCanceled); operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress); } @@ -133,4 +136,24 @@ final class CheckpointingOperation { } } + private static OperatorSnapshotFutures checkpointStreamOperator( + StreamOperator<?> op, + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + CheckpointStreamFactory storageLocation, + Supplier<Boolean> isCanceled) throws Exception { + try { + return op.snapshotState( + checkpointMetaData.getCheckpointId(), + checkpointMetaData.getTimestamp(), + checkpointOptions, + storageLocation); + } + catch (Exception ex) { + if (!isCanceled.get()) { + LOG.info(ex.getMessage(), ex); + } + throw ex; + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index dd7cd4d..f2972cb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -907,7 +907,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> getCancelables(), getAsyncOperationsThreadPool(), getEnvironment(), - this); + this, + this::isCanceled); } // ------------------------------------------------------------------------