kezhuw commented on a change in pull request #15605:
URL: https://github.com/apache/flink/pull/15605#discussion_r613755154



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
##########
@@ -64,6 +69,17 @@
                 subtaskAccess.createEventSendAction(serializedEvent);
 
         final CompletableFuture<Acknowledge> result = new 
CompletableFuture<>();
+        result.whenCompleteAsync(

Review comment:
       I guess I see the correctness in my brain modeling.
   * `SubtaskGateway` is eagerly binding to `Execution` so that there will be 
no mis-targeting events. 
   * Both checkpoint-completion and event sending are sending ordered to main 
thread for execution. So, rendezvous in checkpointing from user side is 
preserved in holder side. This guarantees that "the events that are sent later 
cannot overtake that action".
   * `createEventSendAction` constructs a lazy sending action to execute in 
main thread. This sending action will inspect execution state(`Execution.state` 
or `IncompleteFuturesTracker.failureCause`) to **fail immediately to not become 
pending**.
   
   It might be nice(eg. correctness evaluation) in design part, but I would say 
it is pretty un-straightforward in implementation, at least for me 😅 . Here is 
what I considered as un-straightforward:
   * `Execution.state` inspection in sending action is pretty important here. 
As it contribute to fail immediately. Normally, it should not. That say, a 
delayed failure should not contribute to correctness. I could not even know 
whether `taskExecution.getState()` is a purposed duplication for similar 
inspection in `sendOperatorEvent`.
   * Base on above(eg. delayed failure), 
`taskExecution.getTerminalStateFuture().thenAccept()` becomes crucial. It is 
not a safeguard to "speed up things", but a safeguard to correctness now. 
Without this, a delayed failure will contribute to next checkpoint.
   
   I think the key part is that `OperatorEventValve` is neutral to(eg. has no 
knowledge of) `Execution`, so some work has to be done outside it to avoid 
dated sending action contributing to incomplete futures which are inspected in 
next checkpoint. I have no strong objection to go current approach, but I would 
suggest to adjust docs a bit in `ExecutionSubtaskAccess` and/or other places to 
emphasize that this immediately failure is crucial  to correctness.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to