This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit af15045e87ed2068156e1e408262212b73621b22
Author: Till Rohrmann <[email protected]>
AuthorDate: Thu Jun 11 16:22:06 2020 +0200

    [hotfix] Make sure that no exceptions are swallowed in 
CheckpointCoordinator.startTriggeringCheckpoint
    
    In order to avoid that CompletableFutures don't swallow exception they need 
to terminate with an exception handler.
    FutureUtils.assertNoException(CompletableFuture) asserts that the given 
future does not complete exceptionally. If
    it does, then the system will fail and the exception will be reported.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 83 +++++++++++-----------
 1 file changed, 43 insertions(+), 40 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6c033d9..307ad90 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -538,48 +538,51 @@ public class CheckpointCoordinator {
                                                                        
coordinatorsToCheckpoint, pendingCheckpoint, timer),
                                                        timer);
 
-                       CompletableFuture.allOf(masterStatesComplete, 
coordinatorCheckpointsComplete)
-                               .whenCompleteAsync(
-                                       (ignored, throwable) -> {
-                                               final PendingCheckpoint 
checkpoint =
-                                                       
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
-                                               Preconditions.checkState(
-                                                       checkpoint != null || 
throwable != null,
-                                                       "Either the pending 
checkpoint needs to be created or an error must have been occurred.");
-
-                                               if (throwable != null) {
-                                                       // the initialization 
might not be finished yet
-                                                       if (checkpoint == null) 
{
-                                                               
onTriggerFailure(request, throwable);
-                                                       } else {
-                                                               
onTriggerFailure(checkpoint, throwable);
-                                                       }
-                                               } else {
-                                                       if 
(checkpoint.isDiscarded()) {
-                                                               
onTriggerFailure(
-                                                                       
checkpoint,
-                                                                       new 
CheckpointException(
-                                                                               
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
-                                                                               
checkpoint.getFailureCause()));
+                       FutureUtils.assertNoException(
+                               CompletableFuture.allOf(masterStatesComplete, 
coordinatorCheckpointsComplete)
+                                       .handleAsync(
+                                               (ignored, throwable) -> {
+                                                       final PendingCheckpoint 
checkpoint =
+                                                               
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+
+                                                       
Preconditions.checkState(
+                                                               checkpoint != 
null || throwable != null,
+                                                               "Either the 
pending checkpoint needs to be created or an error must have been occurred.");
+
+                                                       if (throwable != null) {
+                                                               // the 
initialization might not be finished yet
+                                                               if (checkpoint 
== null) {
+                                                                       
onTriggerFailure(request, throwable);
+                                                               } else {
+                                                                       
onTriggerFailure(checkpoint, throwable);
+                                                               }
                                                        } else {
-                                                               // no 
exception, no discarding, everything is OK
-                                                               final long 
checkpointId = checkpoint.getCheckpointId();
-                                                               
snapshotTaskState(
-                                                                       
timestamp,
-                                                                       
checkpointId,
-                                                                       
checkpoint.getCheckpointStorageLocation(),
-                                                                       
request.props,
-                                                                       
executions,
-                                                                       
request.advanceToEndOfTime);
-
-                                                               
coordinatorsToCheckpoint.forEach((ctx) -> 
ctx.afterSourceBarrierInjection(checkpointId));
-
-                                                               
onTriggerSuccess();
+                                                               if 
(checkpoint.isDiscarded()) {
+                                                                       
onTriggerFailure(
+                                                                               
checkpoint,
+                                                                               
new CheckpointException(
+                                                                               
        CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+                                                                               
        checkpoint.getFailureCause()));
+                                                               } else {
+                                                                       // no 
exception, no discarding, everything is OK
+                                                                       final 
long checkpointId = checkpoint.getCheckpointId();
+                                                                       
snapshotTaskState(
+                                                                               
timestamp,
+                                                                               
checkpointId,
+                                                                               
checkpoint.getCheckpointStorageLocation(),
+                                                                               
request.props,
+                                                                               
executions,
+                                                                               
request.advanceToEndOfTime);
+
+                                                                       
coordinatorsToCheckpoint.forEach((ctx) -> 
ctx.afterSourceBarrierInjection(checkpointId));
+
+                                                                       
onTriggerSuccess();
+                                                               }
                                                        }
-                                               }
-                                       },
-                                       timer);
+
+                                                       return null;
+                                               },
+                                               timer));
                } catch (Throwable throwable) {
                        onTriggerFailure(request, throwable);
                }

Reply via email to