This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit af15045e87ed2068156e1e408262212b73621b22 Author: Till Rohrmann <[email protected]> AuthorDate: Thu Jun 11 16:22:06 2020 +0200 [hotfix] Make sure that no exceptions are swallowed in CheckpointCoordinator.startTriggeringCheckpoint In order to avoid that CompletableFutures don't swallow exception they need to terminate with an exception handler. FutureUtils.assertNoException(CompletableFuture) asserts that the given future does not complete exceptionally. If it does, then the system will fail and the exception will be reported. --- .../runtime/checkpoint/CheckpointCoordinator.java | 83 +++++++++++----------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 6c033d9..307ad90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -538,48 +538,51 @@ public class CheckpointCoordinator { coordinatorsToCheckpoint, pendingCheckpoint, timer), timer); - CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete) - .whenCompleteAsync( - (ignored, throwable) -> { - final PendingCheckpoint checkpoint = - FutureUtils.getWithoutException(pendingCheckpointCompletableFuture); - - Preconditions.checkState( - checkpoint != null || throwable != null, - "Either the pending checkpoint needs to be created or an error must have been occurred."); - - if (throwable != null) { - // the initialization might not be finished yet - if (checkpoint == null) { - onTriggerFailure(request, throwable); - } else { - onTriggerFailure(checkpoint, throwable); - } - } else { - if (checkpoint.isDiscarded()) { - onTriggerFailure( - checkpoint, - new CheckpointException( - CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, - checkpoint.getFailureCause())); + FutureUtils.assertNoException( + CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete) + .handleAsync( + (ignored, throwable) -> { + final PendingCheckpoint checkpoint = + FutureUtils.getWithoutException(pendingCheckpointCompletableFuture); + + Preconditions.checkState( + checkpoint != null || throwable != null, + "Either the pending checkpoint needs to be created or an error must have been occurred."); + + if (throwable != null) { + // the initialization might not be finished yet + if (checkpoint == null) { + onTriggerFailure(request, throwable); + } else { + onTriggerFailure(checkpoint, throwable); + } } else { - // no exception, no discarding, everything is OK - final long checkpointId = checkpoint.getCheckpointId(); - snapshotTaskState( - timestamp, - checkpointId, - checkpoint.getCheckpointStorageLocation(), - request.props, - executions, - request.advanceToEndOfTime); - - coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId)); - - onTriggerSuccess(); + if (checkpoint.isDiscarded()) { + onTriggerFailure( + checkpoint, + new CheckpointException( + CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, + checkpoint.getFailureCause())); + } else { + // no exception, no discarding, everything is OK + final long checkpointId = checkpoint.getCheckpointId(); + snapshotTaskState( + timestamp, + checkpointId, + checkpoint.getCheckpointStorageLocation(), + request.props, + executions, + request.advanceToEndOfTime); + + coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId)); + + onTriggerSuccess(); + } } - } - }, - timer); + + return null; + }, + timer)); } catch (Throwable throwable) { onTriggerFailure(request, throwable); }
