yunfengzhou-hub commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r929775009


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -308,6 +337,24 @@ private void checkpointCoordinatorInternal(
         }
     }
 
+    private boolean closeGateways(
+            final long checkpointId, final Set<Integer> subtasksToCheckpoint) {
+        boolean hasCloseableGateway = false;
+        for (int subtask : subtasksToCheckpoint) {
+            SubtaskGatewayImpl gateway = subtaskGatewayMap.get(subtask);
+            if (!gateway.tryCloseGateway(checkpointId)) {

Review Comment:
   I agree that would be simpler. I'll make the change.
   
   If all gateways cannot be closed, it usually means that the checkpoint has 
been aborted before, so the future is already completed exceptionally. It is a 
possible situation that should not throw an exception. If some gateways can be 
closed while others cannot, which should never happen and once happens, it 
usually means a bug, we should throw exceptions in this situation.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -284,22 +311,24 @@ private void checkpointCoordinatorInternal(
                         (success, failure) -> {
                             if (failure != null) {
                                 result.completeExceptionally(failure);
-                            } else if (eventValve.tryShutValve(checkpointId)) {
+                            } else if (closeGateways(checkpointId, 
subtasksToCheckpoint)) {
                                 
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
                             } else {
-                                // if we cannot shut the valve, this means the 
checkpoint
+                                // if we cannot close the gateway, this means 
the checkpoint
                                 // has been aborted before, so the future is 
already
                                 // completed exceptionally. but we try to 
complete it here
                                 // again, just in case, as a safety net.
                                 result.completeExceptionally(
-                                        new FlinkException("Cannot shut event 
valve"));
+                                        new FlinkException("Cannot close 
gateway"));
                             }
                             return null;
                         },
                         mainThreadExecutor));
 
         try {
-            eventValve.markForCheckpoint(checkpointId);
+            for (int subtask : subtasksToCheckpoint) {
+                subtaskGatewayMap.get(subtask).markForCheckpoint(checkpointId);

Review Comment:
   Same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to