yunfengzhou-hub commented on code in PR #20454:
URL: https://github.com/apache/flink/pull/20454#discussion_r940807420


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -204,18 +228,52 @@ public void close() throws Exception {
     public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event)
             throws Exception {
         mainThreadExecutor.assertRunningInMainThread();
+
+        if (event instanceof AcknowledgeCloseGatewayEvent) {
+            Preconditions.checkArgument(
+                    subtask == ((AcknowledgeCloseGatewayEvent) 
event).getSubtaskIndex());
+            completeAcknowledgeCloseGatewayFuture(
+                    subtask, ((AcknowledgeCloseGatewayEvent) 
event).getCheckpointID());
+            return;
+        } else if (event instanceof AcknowledgeCheckpointEvent) {
+            Preconditions.checkArgument(
+                    subtask == ((AcknowledgeCheckpointEvent) 
event).getSubtaskIndex());
+            subtaskGatewayMap
+                    .get(subtask)
+                    .openGatewayAndUnmarkCheckpoint(
+                            ((AcknowledgeCheckpointEvent) 
event).getCheckpointID());
+            return;
+        }
+
         coordinator.handleEventFromOperator(subtask, attemptNumber, event);
     }
 
     public void executionAttemptFailed(int subtask, int attemptNumber, 
@Nullable Throwable reason) {
         mainThreadExecutor.assertRunningInMainThread();
+
+        if (!context.isConcurrentExecutionAttemptsSupported()
+                && acknowledgeCloseGatewayFutureMap.containsKey(subtask)) {
+            Exception exception =
+                    new FlinkException(String.format("Subtask %d has failed.", 
subtask), reason);
+            
acknowledgeCloseGatewayFutureMap.get(subtask).completeExceptionally(exception);

Review Comment:
   There is no guarantee that a subtask would not fail or be reset during 
checkpoints. If this happens, the coordinator might not receive ACK events from 
that subtask and the checkpoint process might be infinitely blocked. Thus we 
need to update the map here to make sure that the checkpoint can be correctly 
aborted in the situation described above. Same for that in `subtaskReset`.
   
   According to our offline discussion, I'll add a method here to wrap this 
part of logic and reuse the method in `executionAttemptFailed()` and 
`subtaskReset()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to