This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9466e090bd66906daf1e571328aab70743f31223
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Apr 16 18:10:25 2021 +0200

    [hotfix][coordination] Add safety guard against uncaught exceptions for 
Future dependent lambdas
---
 .../coordination/OperatorCoordinatorHolder.java    | 33 ++++++++++++----------
 1 file changed, 18 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index a38ca16..8625fe2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -283,21 +283,24 @@ public class OperatorCoordinatorHolder
 
         final CompletableFuture<byte[]> coordinatorCheckpoint = new 
CompletableFuture<>();
 
-        coordinatorCheckpoint.whenCompleteAsync(
-                (success, failure) -> {
-                    if (failure != null) {
-                        result.completeExceptionally(failure);
-                    } else if (eventValve.tryShutValve(checkpointId)) {
-                        completeCheckpointOnceEventsAreDone(checkpointId, 
result, success);
-                    } else {
-                        // if we cannot shut the valve, this means the 
checkpoint
-                        // has been aborted before, so the future is already
-                        // completed exceptionally. but we try to complete it 
here
-                        // again, just in case, as a safety net.
-                        result.completeExceptionally(new 
FlinkException("Cannot shut event valve"));
-                    }
-                },
-                mainThreadExecutor);
+        FutureUtils.assertNoException(
+                coordinatorCheckpoint.handleAsync(
+                        (success, failure) -> {
+                            if (failure != null) {
+                                result.completeExceptionally(failure);
+                            } else if (eventValve.tryShutValve(checkpointId)) {
+                                
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
+                            } else {
+                                // if we cannot shut the valve, this means the 
checkpoint
+                                // has been aborted before, so the future is 
already
+                                // completed exceptionally. but we try to 
complete it here
+                                // again, just in case, as a safety net.
+                                result.completeExceptionally(
+                                        new FlinkException("Cannot shut event 
valve"));
+                            }
+                            return null;
+                        },
+                        mainThreadExecutor));
 
         try {
             eventValve.markForCheckpoint(checkpointId);

Reply via email to