elon-X commented on code in PR #24757:
URL: https://github.com/apache/flink/pull/24757#discussion_r1598735613
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -201,8 +201,11 @@ void announceCombinedWatermark() {
// to ready task to avoid period task fail (Java-ThreadPoolExecutor
will not schedule
// the period task if it throws an exception).
for (Integer subtaskId : subTaskIds) {
- context.sendEventToSourceOperatorIfTaskReady(
- subtaskId, new
WatermarkAlignmentEvent(maxAllowedWatermark));
+ // when subtask have been finished, do not send event.
+ if (!context.hasNoMoreSplits(subtaskId)) {
Review Comment:
Hi @1996fanrui ,
First of all, thank you for your detailed review and response. I would like
to share my thoughts:
1.Regarding the SourceOperator, initially, I considered the issue of code
positioning. I noticed that before switching to the
`DataInputStatus.END_OF_INPUT` state, the SourceOperator receives a
`NoMoreSplitsEvent`, and the SourceCoordinator also needs to determine whether
the subtask has finished or not. Therefore, I chose to send the maximum
timestamp within the `if (event instanceof NoMoreSplitsEvent) { } ` I believe
these two locations should be equivalent.
2.On the SourceCoordinator side, if a task has finished, the
`context.sendEventToSourceOperatorIfTaskReady` code still sends events to that
subtask. In the code provided by Gyula, if the response result from
`gateway.sendEvent(event).get()` is obtained, the following exception occurs:
`Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.operators.coordination.TaskNotRunningException:
"Source: Sequence Source -> Filter -> Sink: Print to Std. Out (1/2)" is not
running, but in state FINISHED`
Therefore, I added the condition `!context.hasNoMoreSplits(subtaskId)`. I
also tested the scenario "some subtask doesn't have split when the parallelism
of Kafka source is greater than Kafka partition," and indeed, no
`NoMoreSplitsEvent` is sent.
Please correct me if my understanding is wrong. I will modify the code based
on the final discussion results and submit it~
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]