yunfengzhou-hub commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r926407667


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -245,7 +262,13 @@ public void notifyCheckpointAborted(long checkpointId) {
         // checkpoint coordinator time thread.
         // we can remove the delegation once the checkpoint coordinator runs 
fully in the
         // scheduler's main thread executor
-        mainThreadExecutor.execute(() -> 
coordinator.notifyCheckpointAborted(checkpointId));
+        mainThreadExecutor.execute(
+                () -> {
+                    subtaskGatewayMap
+                            .values()
+                            .forEach(x -> 
x.openGatewayAndUnmarkCheckpoint(checkpointId));
+                    coordinator.notifyCheckpointAborted(checkpointId);

Review Comment:
   As we are removing the `afterSourceBarrierInjection()` and the invocation of 
`openGatewayAndUnmarkCheckpoint()` in it, we need to add this method to the 
possible afterward methods, including `handleEventFromOperator` and 
`notifyCheckpointAborted`.
   
   During our offline discussion, we doubted whether the invocation of this 
method in `abortCurrentTriggering` would be enough. I found that 
`notifyCheckpointAborted` and `abortCurrentTriggering` might be invoked 
independently, so it is still necessary to invoke this method in 
`notifyCheckpointAborted`. I have added test cases in 
`CoordinatorEventsExactlyOnceTest` to verify this situation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to