LoveHeat commented on code in PR #22806:
URL: https://github.com/apache/flink/pull/22806#discussion_r1251757472
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java:
##########
@@ -220,8 +223,20 @@ void sendEventToSourceOperator(int subtaskId,
OperatorEvent event) {
String.format("Failed to send event %s to subtask %d", event,
subtaskId));
}
- ScheduledExecutorService getCoordinatorExecutor() {
- return coordinatorExecutor;
+ void sendEventToSourceOperatorIfTaskReady(int subtaskId, OperatorEvent
event) {
+ checkSubtaskIndex(subtaskId);
+
+ callInCoordinatorThread(
+ () -> {
+ final OperatorCoordinator.SubtaskGateway gateway =
+
subtaskGateways.getOnlyGatewayAndNotCheckReady(subtaskId);
+ if (gateway != null) {
+ gateway.sendEvent(event);
+ }
Review Comment:
but `gateway.sendEvent()` only throw three potential exception:
1. [task is not
ready](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java#L92),
i think this exception will not thrown for ever, because we have already check
task ready before call sendEvent()
2. [serialize event
failed](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java#L101),
i think this exception can not retry, should just fail job(but i don't known
whether fail job for batch job is ok? )
3. just like: `if sendEvent fails, it won't throw exception directly and
call the subtaskAccess.triggerTaskFailover.`
For other exception (like oom , jvm error) is also can not be retried
(correct me if i misunderstand your opinion :smiley:)
@pnowojski @1996fanrui
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]