LoveHeat commented on code in PR #22806:
URL: https://github.com/apache/flink/pull/22806#discussion_r1251702812


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java:
##########
@@ -220,8 +223,20 @@ void sendEventToSourceOperator(int subtaskId, 
OperatorEvent event) {
                 String.format("Failed to send event %s to subtask %d", event, 
subtaskId));
     }
 
-    ScheduledExecutorService getCoordinatorExecutor() {
-        return coordinatorExecutor;
+    void sendEventToSourceOperatorIfTaskReady(int subtaskId, OperatorEvent 
event) {
+        checkSubtaskIndex(subtaskId);
+
+        callInCoordinatorThread(
+                () -> {
+                    final OperatorCoordinator.SubtaskGateway gateway =
+                            
subtaskGateways.getOnlyGatewayAndNotCheckReady(subtaskId);
+                    if (gateway != null) {
+                        gateway.sendEvent(event);
+                    }

Review Comment:
   Got it~From the code, when reach code here, the task is treated as ok 
because gateway is valid.  The return type of gateway.sendEvent()  is a future, 
if sendEvent to tm failed because of task is restarting, the exception will be 
save in the future, we don't call future.get() so the exception will be 
ignored(Job will restarted after jm received task failure event).
   
   sendEvent will also throw another exception:
   1. [task is not 
ready](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java#L92),
 i think this exception will not thrown, because we have already check task 
ready before call sendEvent()
   2. [serialize event 
failed](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java#L101),
 this exception can not retry
   @1996fanrui @pnowojski 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to