This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5a0dcfb85c3d0be22e6154d63a24d35d7d69855e Author: Dawid Wysakowicz <[email protected]> AuthorDate: Thu Dec 2 12:57:01 2021 +0100 [hotfix] Remove duplicated handling of stop-with-savepoint trigger failure After FLINK-23741 the failure is handled in Task#triggerCheckpointBarrier, therefore we no longer need to handle it inside of StreamTask. Moreover if a failure had occured in assertTriggeringCheckpointExceptions the failed checkpoint would have not been declined. --- .../runtime/tasks/MultipleInputStreamTask.java | 8 ++------ .../runtime/tasks/SourceOperatorStreamTask.java | 8 ++------ .../streaming/runtime/tasks/SourceStreamTask.java | 14 ++++++------- .../flink/streaming/runtime/tasks/StreamTask.java | 24 ---------------------- 4 files changed, 10 insertions(+), 44 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java index 89a5434..4f79660 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java @@ -251,12 +251,8 @@ public class MultipleInputStreamTask<OUT> }, "stop chained Flip-27 source for stop-with-savepoint --drain"); - return assertTriggeringCheckpointExceptions( - sourcesStopped.thenCompose( - ignore -> - triggerSourcesCheckpointAsync( - checkpointMetaData, checkpointOptions)), - checkpointMetaData.getCheckpointId()); + return sourcesStopped.thenCompose( + ignore -> triggerSourcesCheckpointAsync(checkpointMetaData, checkpointOptions)); } private void checkPendingCheckpointCompletedFuturesSize() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java index 8d0bf97..6ce7b26 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java @@ -137,12 +137,8 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, }, "stop Flip-27 source for stop-with-savepoint --drain"); - return assertTriggeringCheckpointExceptions( - operatorFinished.thenCompose( - (ignore) -> - super.triggerCheckpointAsync( - checkpointMetaData, checkpointOptions)), - checkpointMetaData.getCheckpointId()); + return operatorFinished.thenCompose( + (ignore) -> super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions)); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index d51080e..0206ef8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -286,14 +286,12 @@ public class SourceStreamTask< stopOperatorForStopWithSavepointWithDrain( checkpointMetaData.getCheckpointId()), "stop legacy source for stop-with-savepoint --drain"); - return assertTriggeringCheckpointExceptions( - sourceThread - .getCompletionFuture() - .thenCompose( - ignore -> - super.triggerCheckpointAsync( - checkpointMetaData, checkpointOptions)), - checkpointMetaData.getCheckpointId()); + return sourceThread + .getCompletionFuture() + .thenCompose( + ignore -> + super.triggerCheckpointAsync( + checkpointMetaData, checkpointOptions)); } private void stopOperatorForStopWithSavepointWithDrain(long checkpointId) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 06c6874..42a1c2b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -102,7 +102,6 @@ import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TernaryBoolean; -import org.apache.flink.util.WrappingRuntimeException; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.RunnableWithException; @@ -1211,29 +1210,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> return true; } - protected final CompletableFuture<Boolean> assertTriggeringCheckpointExceptions( - CompletableFuture<Boolean> triggerFuture, long checkpointId) { - CompletableFuture<Boolean> checkpointTriggered = - triggerFuture.exceptionally( - error -> { - if (ExceptionUtils.findThrowable( - error, RejectedExecutionException.class) - .isPresent()) { - // This may happen if the mailbox is closed. It means that - // the task is shutting down, so we just ignore it. - LOG.debug( - "Triggering checkpoint {} for {} was rejected by the mailbox", - checkpointId, - getTaskNameWithSubtaskAndId()); - return false; - } else { - throw new WrappingRuntimeException(error); - } - }); - FutureUtils.assertNoException(checkpointTriggered); - return checkpointTriggered; - } - /** * Acquires the optional {@link CheckpointBarrierHandler} associated with this stream task. The * {@code CheckpointBarrierHandler} should exist if the task has data inputs and requires to
