LoveHeat commented on code in PR #22806:
URL: https://github.com/apache/flink/pull/22806#discussion_r1251702812
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java:
##########
@@ -220,8 +223,20 @@ void sendEventToSourceOperator(int subtaskId,
OperatorEvent event) {
String.format("Failed to send event %s to subtask %d", event,
subtaskId));
}
- ScheduledExecutorService getCoordinatorExecutor() {
- return coordinatorExecutor;
+ void sendEventToSourceOperatorIfTaskReady(int subtaskId, OperatorEvent
event) {
+ checkSubtaskIndex(subtaskId);
+
+ callInCoordinatorThread(
+ () -> {
+ final OperatorCoordinator.SubtaskGateway gateway =
+
subtaskGateways.getOnlyGatewayAndNotCheckReady(subtaskId);
+ if (gateway != null) {
+ gateway.sendEvent(event);
+ }
Review Comment:
Got it~From the code, when reach code here, the task is treated as ok
because gateway is valid. The return type of gateway.sendEvent() is a future,
if sendEvent to tm failed because of task is restarting, the exception will be
save in the future, we don't call future.get() so the exception will be
ignored(Job will restarted after jm received task failure event).
sendEvent will also throw another exception:
1. [task is not
ready](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java#L92),
i think this exception will not thrown, because we have already check task
ready before call sendEvent()
2. [serialize event
failed](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java#L101),
this exception can not retry
@1996fanrui @pnowojski
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]