yunfengzhou-hub commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r926212101


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -277,29 +300,33 @@ private void checkpointCoordinatorInternal(
             final long checkpointId, final CompletableFuture<byte[]> result) {
         mainThreadExecutor.assertRunningInMainThread();
 
-        final CompletableFuture<byte[]> coordinatorCheckpoint = new 
CompletableFuture<>();
-
-        FutureUtils.assertNoException(
-                coordinatorCheckpoint.handleAsync(
-                        (success, failure) -> {
-                            if (failure != null) {
-                                result.completeExceptionally(failure);
-                            } else if (eventValve.tryShutValve(checkpointId)) {
-                                
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
-                            } else {
-                                // if we cannot shut the valve, this means the 
checkpoint
-                                // has been aborted before, so the future is 
already
-                                // completed exceptionally. but we try to 
complete it here
-                                // again, just in case, as a safety net.
-                                result.completeExceptionally(
-                                        new FlinkException("Cannot shut event 
valve"));
-                            }
-                            return null;
-                        },
-                        mainThreadExecutor));
-
         try {
-            eventValve.markForCheckpoint(checkpointId);
+            subtaskGatewayMap.values().forEach(x -> 
x.markForCheckpoint(checkpointId));
+
+            final CompletableFuture<byte[]> coordinatorCheckpoint = new 
CompletableFuture<>();
+
+            coordinatorCheckpoint.whenComplete(
+                    (success, failure) -> {
+                        if (failure != null) {
+                            result.completeExceptionally(failure);
+                        } else {
+                            closeGateways(checkpointId, result);
+                        }
+                    });
+
+            FutureUtils.assertNoException(
+                    coordinatorCheckpoint.handleAsync(
+                            (success, failure) -> {
+                                if (failure != null) {
+                                    result.completeExceptionally(failure);

Review Comment:
   As we are removing the `whenComplete()` above and merging that logic to this 
`handleAsync()`, this comment is not applicable now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to