This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9466e090bd66906daf1e571328aab70743f31223 Author: Stephan Ewen <se...@apache.org> AuthorDate: Fri Apr 16 18:10:25 2021 +0200 [hotfix][coordination] Add safety guard against uncaught exceptions for Future dependent lambdas --- .../coordination/OperatorCoordinatorHolder.java | 33 ++++++++++++---------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index a38ca16..8625fe2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -283,21 +283,24 @@ public class OperatorCoordinatorHolder final CompletableFuture<byte[]> coordinatorCheckpoint = new CompletableFuture<>(); - coordinatorCheckpoint.whenCompleteAsync( - (success, failure) -> { - if (failure != null) { - result.completeExceptionally(failure); - } else if (eventValve.tryShutValve(checkpointId)) { - completeCheckpointOnceEventsAreDone(checkpointId, result, success); - } else { - // if we cannot shut the valve, this means the checkpoint - // has been aborted before, so the future is already - // completed exceptionally. but we try to complete it here - // again, just in case, as a safety net. - result.completeExceptionally(new FlinkException("Cannot shut event valve")); - } - }, - mainThreadExecutor); + FutureUtils.assertNoException( + coordinatorCheckpoint.handleAsync( + (success, failure) -> { + if (failure != null) { + result.completeExceptionally(failure); + } else if (eventValve.tryShutValve(checkpointId)) { + completeCheckpointOnceEventsAreDone(checkpointId, result, success); + } else { + // if we cannot shut the valve, this means the checkpoint + // has been aborted before, so the future is already + // completed exceptionally. but we try to complete it here + // again, just in case, as a safety net. + result.completeExceptionally( + new FlinkException("Cannot shut event valve")); + } + return null; + }, + mainThreadExecutor)); try { eventValve.markForCheckpoint(checkpointId);