1996fanrui commented on code in PR #22806:
URL: https://github.com/apache/flink/pull/22806#discussion_r1251571234


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java:
##########
@@ -220,8 +223,20 @@ void sendEventToSourceOperator(int subtaskId, 
OperatorEvent event) {
                 String.format("Failed to send event %s to subtask %d", event, 
subtaskId));
     }
 
-    ScheduledExecutorService getCoordinatorExecutor() {
-        return coordinatorExecutor;
+    void sendEventToSourceOperatorIfTaskReady(int subtaskId, OperatorEvent 
event) {
+        checkSubtaskIndex(subtaskId);
+
+        callInCoordinatorThread(
+                () -> {
+                    final OperatorCoordinator.SubtaskGateway gateway =
+                            
subtaskGateways.getOnlyGatewayAndNotCheckReady(subtaskId);
+                    if (gateway != null) {
+                        gateway.sendEvent(event);
+                    }

Review Comment:
   Hi @LoveHeat , as I understand, the operations of subtaskGateways isn't race 
condition. Piotr means when you intend to call `gateway.sendEvent(event)`, the 
gateway must not have a race condition. But the corresponding subtask may be 
restarting, causing gateway.sendEvent(event) to fail, right?
   
   If so, it's probably fine too. From the 
`SourceCoordinatorContext#sendEventToSourceOperator` -> 
[SubtaskGatewayImpl#sendEvent](https://github.com/apache/flink/blob/b402108f9bc468ed5223a5d73ea0bfcfa0085cfe/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java#L90),
 if `sendEvent` fails, it will call the `subtaskAccess.triggerTaskFailover`.
   
   BTW, I'm not sure whether the `SubtaskGatewayImpl#sendEvent` should throw 
`FlinkRuntimeException`. From the current code, the coordinatorExecutor cannot 
meet any exception, and most of  `SubtaskGatewayImpl#sendEvent` are called 
inside of the coordinatorExecutor thread. So the coordinatorExecutor will fail 
after any `FlinkRuntimeException` is thrown. However, the expected behavior 
should be that the task fails.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to