AHeise commented on a change in pull request #14828:
URL: https://github.com/apache/flink/pull/14828#discussion_r567812836



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
##########
@@ -263,11 +264,21 @@ private void handleExecutionException(Exception e) {
                     // Otherwise this followup exception could race the 
original exception in
                     // failing the task.
                     try {
+                        Optional<CheckpointException> 
underlyingCheckpointException =
+                                ExceptionUtils.findThrowable(
+                                        checkpointException, 
CheckpointException.class);
+                        // If this failure is already a CheckpointException, 
do not overwrite the
+                        // original CheckpointFailureReason
+                        CheckpointFailureReason reportedFailureReason =

Review comment:
       `underlyingCheckpointException.map(...).orElse?`

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
##########
@@ -89,6 +78,47 @@ private void testAsyncCheckpointException(Supplier<Boolean> 
isTaskRunning) {
         }
     }
 
+    @Test
+    public void testDeclineAsyncCheckpoint() {
+        CheckpointFailureReason originalReason =
+                
CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM;
+
+        final Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress = 
new HashMap<>();
+        snapshotsInProgress.put(
+                new OperatorID(),
+                new OperatorSnapshotFutures(
+                        DoneFuture.of(SnapshotResult.empty()),
+                        DoneFuture.of(SnapshotResult.empty()),
+                        DoneFuture.of(SnapshotResult.empty()),
+                        DoneFuture.of(SnapshotResult.empty()),
+                        ExceptionallyDoneFuture.of(new 
CheckpointException(originalReason)),
+                        DoneFuture.of(SnapshotResult.empty())));
+
+        final TestEnvironment environment = new TestEnvironment();
+        final AsyncCheckpointRunnable runnable =
+                createAsyncRunnable(snapshotsInProgress, environment, true);
+        runnable.run();
+
+        
Assert.assertSame((environment.getCause()).getCheckpointFailureReason(), 
originalReason);

Review comment:
       nit: Remove extra braces? (also above from where you copied)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -257,8 +257,8 @@ private void abortInternal(long cancelledId, 
CheckpointException exception) thro
                 Math.max(lastCancelledOrCompletedCheckpointId, cancelledId);
         numBarriersReceived = 0;
         controller.abortPendingCheckpoint(cancelledId, exception);
-        allBarriersReceivedFuture.completeExceptionally(exception);
         notifyAbort(cancelledId, exception);
+        allBarriersReceivedFuture.completeExceptionally(exception);

Review comment:
       I'd think that we should backport all fixes in this commit, so I'd be 
okay with leaving it as is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to