elon-X commented on code in PR #24757:
URL: https://github.com/apache/flink/pull/24757#discussion_r1598735613


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -201,8 +201,11 @@ void announceCombinedWatermark() {
         // to ready task to avoid period task fail (Java-ThreadPoolExecutor 
will not schedule
         // the period task if it throws an exception).
         for (Integer subtaskId : subTaskIds) {
-            context.sendEventToSourceOperatorIfTaskReady(
-                    subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+            // when subtask have been finished, do not send event.
+            if (!context.hasNoMoreSplits(subtaskId)) {

Review Comment:
   Hi @1996fanrui , 
   First of all, thank you for your detailed review and response. I would like 
to share my thoughts:
   
   1.Regarding the SourceOperator, initially, I considered the issue of code 
positioning. I noticed that before switching to the 
`DataInputStatus.END_OF_INPUT` state, the SourceOperator receives a 
`NoMoreSplitsEvent`,  and the SourceCoordinator also needs to determine whether 
the subtask has finished or not. Therefore, I chose to send the maximum 
timestamp within the `if (event instanceof NoMoreSplitsEvent) { } ` I believe 
these two locations should be equivalent.
   
   2.On the SourceCoordinator side, if a task has finished, the 
`context.sendEventToSourceOperatorIfTaskReady` code still sends events to that 
subtask. In the code provided by Gyula, if the response result from 
`gateway.sendEvent(event).get()` is obtained, the following exception occurs:
   `Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.operators.coordination.TaskNotRunningException: 
"Source: Sequence Source -> Filter -> Sink: Print to Std. Out (1/2)" is not 
running, but in state FINISHED`
   
   Therefore, I added the condition `!context.hasNoMoreSplits(subtaskId)`. I 
also tested the scenario "some subtask doesn't have split when the parallelism 
of Kafka source is greater than Kafka partition," and indeed, no 
`NoMoreSplitsEvent` is sent.
   
   Please correct me if my understanding is wrong. I will modify the code based 
on the final discussion results and submit it~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to