1996fanrui commented on code in PR #22806:
URL: https://github.com/apache/flink/pull/22806#discussion_r1251571234
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java:
##########
@@ -220,8 +223,20 @@ void sendEventToSourceOperator(int subtaskId,
OperatorEvent event) {
String.format("Failed to send event %s to subtask %d", event,
subtaskId));
}
- ScheduledExecutorService getCoordinatorExecutor() {
- return coordinatorExecutor;
+ void sendEventToSourceOperatorIfTaskReady(int subtaskId, OperatorEvent
event) {
+ checkSubtaskIndex(subtaskId);
+
+ callInCoordinatorThread(
+ () -> {
+ final OperatorCoordinator.SubtaskGateway gateway =
+
subtaskGateways.getOnlyGatewayAndNotCheckReady(subtaskId);
+ if (gateway != null) {
+ gateway.sendEvent(event);
+ }
Review Comment:
Hi @LoveHeat , as I understand, the operations of subtaskGateways isn't race
condition. Piotr means when you intend to call `gateway.sendEvent(event)`, the
gateway must not have a race condition. But the corresponding subtask may be
restarting, causing gateway.sendEvent(event) to fail, right?
If so, it's probably fine too. From the
`SourceCoordinatorContext#sendEventToSourceOperator` ->
[SubtaskGatewayImpl#sendEvent](https://github.com/apache/flink/blob/b402108f9bc468ed5223a5d73ea0bfcfa0085cfe/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java#L90),
if `sendEvent` fails, it won't throw exception directly and call the
`subtaskAccess.triggerTaskFailover`.
BTW, I'm not sure whether the `SubtaskGatewayImpl#sendEvent` should throw
`FlinkRuntimeException`. From the current code, the coordinatorExecutor cannot
meet any exception, and most of `SubtaskGatewayImpl#sendEvent` are called
inside of the coordinatorExecutor thread. So the coordinatorExecutor will fail
after any `FlinkRuntimeException` is thrown. However, the expected behavior
should be that the task fails.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]