This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5a0dcfb85c3d0be22e6154d63a24d35d7d69855e
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Dec 2 12:57:01 2021 +0100

    [hotfix] Remove duplicated handling of stop-with-savepoint trigger failure
    
    After FLINK-23741 the failure is handled in Task#triggerCheckpointBarrier, 
therefore we no longer need to handle it inside of StreamTask. Moreover if a 
failure had occured in assertTriggeringCheckpointExceptions the failed 
checkpoint would have not been declined.
---
 .../runtime/tasks/MultipleInputStreamTask.java     |  8 ++------
 .../runtime/tasks/SourceOperatorStreamTask.java    |  8 ++------
 .../streaming/runtime/tasks/SourceStreamTask.java  | 14 ++++++-------
 .../flink/streaming/runtime/tasks/StreamTask.java  | 24 ----------------------
 4 files changed, 10 insertions(+), 44 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
index 89a5434..4f79660 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
@@ -251,12 +251,8 @@ public class MultipleInputStreamTask<OUT>
                 },
                 "stop chained Flip-27 source for stop-with-savepoint --drain");
 
-        return assertTriggeringCheckpointExceptions(
-                sourcesStopped.thenCompose(
-                        ignore ->
-                                triggerSourcesCheckpointAsync(
-                                        checkpointMetaData, 
checkpointOptions)),
-                checkpointMetaData.getCheckpointId());
+        return sourcesStopped.thenCompose(
+                ignore -> triggerSourcesCheckpointAsync(checkpointMetaData, 
checkpointOptions));
     }
 
     private void checkPendingCheckpointCompletedFuturesSize() {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index 8d0bf97..6ce7b26 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -137,12 +137,8 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
                 },
                 "stop Flip-27 source for stop-with-savepoint --drain");
 
-        return assertTriggeringCheckpointExceptions(
-                operatorFinished.thenCompose(
-                        (ignore) ->
-                                super.triggerCheckpointAsync(
-                                        checkpointMetaData, 
checkpointOptions)),
-                checkpointMetaData.getCheckpointId());
+        return operatorFinished.thenCompose(
+                (ignore) -> super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions));
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index d51080e..0206ef8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -286,14 +286,12 @@ public class SourceStreamTask<
                         stopOperatorForStopWithSavepointWithDrain(
                                 checkpointMetaData.getCheckpointId()),
                 "stop legacy source for stop-with-savepoint --drain");
-        return assertTriggeringCheckpointExceptions(
-                sourceThread
-                        .getCompletionFuture()
-                        .thenCompose(
-                                ignore ->
-                                        super.triggerCheckpointAsync(
-                                                checkpointMetaData, 
checkpointOptions)),
-                checkpointMetaData.getCheckpointId());
+        return sourceThread
+                .getCompletionFuture()
+                .thenCompose(
+                        ignore ->
+                                super.triggerCheckpointAsync(
+                                        checkpointMetaData, 
checkpointOptions));
     }
 
     private void stopOperatorForStopWithSavepointWithDrain(long checkpointId) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 06c6874..42a1c2b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -102,7 +102,6 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TernaryBoolean;
-import org.apache.flink.util.WrappingRuntimeException;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.RunnableWithException;
@@ -1211,29 +1210,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         return true;
     }
 
-    protected final CompletableFuture<Boolean> 
assertTriggeringCheckpointExceptions(
-            CompletableFuture<Boolean> triggerFuture, long checkpointId) {
-        CompletableFuture<Boolean> checkpointTriggered =
-                triggerFuture.exceptionally(
-                        error -> {
-                            if (ExceptionUtils.findThrowable(
-                                            error, 
RejectedExecutionException.class)
-                                    .isPresent()) {
-                                // This may happen if the mailbox is closed. 
It means that
-                                // the task is shutting down, so we just 
ignore it.
-                                LOG.debug(
-                                        "Triggering checkpoint {} for {} was 
rejected by the mailbox",
-                                        checkpointId,
-                                        getTaskNameWithSubtaskAndId());
-                                return false;
-                            } else {
-                                throw new WrappingRuntimeException(error);
-                            }
-                        });
-        FutureUtils.assertNoException(checkpointTriggered);
-        return checkpointTriggered;
-    }
-
     /**
      * Acquires the optional {@link CheckpointBarrierHandler} associated with 
this stream task. The
      * {@code CheckpointBarrierHandler} should exist if the task has data 
inputs and requires to

Reply via email to