lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r923952394


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java:
##########
@@ -82,6 +83,10 @@ public void registerEventHandler(OperatorID operator, 
OperatorEventHandler handl
         }
     }
 
+    Set<OperatorID> getRegisteredOperators() {

Review Comment:
   Could we explicitly mark methods as `private/protected/public` for 
readability and consistency?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -245,7 +262,13 @@ public void notifyCheckpointAborted(long checkpointId) {
         // checkpoint coordinator time thread.
         // we can remove the delegation once the checkpoint coordinator runs 
fully in the
         // scheduler's main thread executor
-        mainThreadExecutor.execute(() -> 
coordinator.notifyCheckpointAborted(checkpointId));
+        mainThreadExecutor.execute(
+                () -> {
+                    subtaskGatewayMap
+                            .values()
+                            .forEach(x -> 
x.openGatewayAndUnmarkCheckpoint(checkpointId));
+                    coordinator.notifyCheckpointAborted(checkpointId);

Review Comment:
   This method does not call `eventValve.openValveAndUnmarkCheckpoint()` prior 
to this PR. Could you explain why it is now necessary to call 
`openGatewayAndUnmarkCheckpoint()`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -277,29 +300,33 @@ private void checkpointCoordinatorInternal(
             final long checkpointId, final CompletableFuture<byte[]> result) {
         mainThreadExecutor.assertRunningInMainThread();
 
-        final CompletableFuture<byte[]> coordinatorCheckpoint = new 
CompletableFuture<>();
-
-        FutureUtils.assertNoException(
-                coordinatorCheckpoint.handleAsync(
-                        (success, failure) -> {
-                            if (failure != null) {
-                                result.completeExceptionally(failure);
-                            } else if (eventValve.tryShutValve(checkpointId)) {
-                                
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
-                            } else {
-                                // if we cannot shut the valve, this means the 
checkpoint
-                                // has been aborted before, so the future is 
already
-                                // completed exceptionally. but we try to 
complete it here
-                                // again, just in case, as a safety net.
-                                result.completeExceptionally(
-                                        new FlinkException("Cannot shut event 
valve"));
-                            }
-                            return null;
-                        },
-                        mainThreadExecutor));
-
         try {
-            eventValve.markForCheckpoint(checkpointId);
+            subtaskGatewayMap.values().forEach(x -> 
x.markForCheckpoint(checkpointId));
+
+            final CompletableFuture<byte[]> coordinatorCheckpoint = new 
CompletableFuture<>();
+
+            coordinatorCheckpoint.whenComplete(
+                    (success, failure) -> {
+                        if (failure != null) {
+                            result.completeExceptionally(failure);
+                        } else {
+                            closeGateways(checkpointId, result);
+                        }
+                    });
+
+            FutureUtils.assertNoException(
+                    coordinatorCheckpoint.handleAsync(
+                            (success, failure) -> {
+                                if (failure != null) {
+                                    result.completeExceptionally(failure);

Review Comment:
   This line seems to duplicate the logic at line 311 above.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -308,6 +335,26 @@ private void checkpointCoordinatorInternal(
         }
     }
 
+    private void closeGateways(final long checkpointId, final 
CompletableFuture<byte[]> result) {
+        Set<CloseableSubtaskGateway> closedGateways = new HashSet<>();
+        for (CloseableSubtaskGateway gateway : subtaskGatewayMap.values()) {
+            if (!gateway.tryCloseGateway(checkpointId)) {
+                
closedGateways.forEach(CloseableSubtaskGateway::openGatewayAndUnmarkCheckpoint);

Review Comment:
   Is there any case where `closedGateways` is not empty at this point? If no, 
could it be better to throw IllegalStateException if `closedGateways()` is not 
empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to