kezhuw commented on a change in pull request #15605: URL: https://github.com/apache/flink/pull/15605#discussion_r613755154
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java ########## @@ -64,6 +69,17 @@ subtaskAccess.createEventSendAction(serializedEvent); final CompletableFuture<Acknowledge> result = new CompletableFuture<>(); + result.whenCompleteAsync( Review comment: I guess I see the correctness in my brain modeling. * `SubtaskGateway` is eagerly binding to `Execution` so that there will be no mis-targeting events. * Both checkpoint-completion and event sending are sending ordered to main thread for execution. So, rendezvous in checkpointing from user side is preserved in holder side. This guarantees that "the events that are sent later cannot overtake that action". * `createEventSendAction` constructs a lazy sending action to execute in main thread. This sending action will inspect execution state(`Execution.state` or `IncompleteFuturesTracker.failureCause`) to **fail immediately to not become pending**. It might be nice(eg. correctness evaluation) in design part, but I would say it is pretty un-straightforward in implementation, at least for me 😅 . Here is what I considered as un-straightforward: * `Execution.state` inspection in sending action is pretty important here. As it contribute to fail immediately. Normally, it should not. That say, a delayed failure should not contribute to correctness. I could not even know whether `taskExecution.getState()` is a purposed duplication for similar inspection in `sendOperatorEvent`. * Base on above(eg. delayed failure), `taskExecution.getTerminalStateFuture().thenAccept()` becomes crucial. It is not a safeguard to "speed up things", but a safeguard to correctness now. Without this, a delayed failure will contribute to next checkpoint. I think the key part is that `OperatorEventValve` is neutral to(eg. has no knowledge of) `Execution`, so some work has to be done outside it to avoid dated sending action contributing to incomplete futures which are inspected in next checkpoint. I have no strong objection to go current approach, but I would suggest to adjust docs a bit in `ExecutionSubtaskAccess` and/or other places to emphasize that this immediately failure is crucial to correctness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org