1996fanrui commented on code in PR #24757:
URL: https://github.com/apache/flink/pull/24757#discussion_r1596434203


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -201,8 +201,11 @@ void announceCombinedWatermark() {
         // to ready task to avoid period task fail (Java-ThreadPoolExecutor 
will not schedule
         // the period task if it throws an exception).
         for (Integer subtaskId : subTaskIds) {
-            context.sendEventToSourceOperatorIfTaskReady(
-                    subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+            // when subtask have been finished, do not send event.
+            if (!context.hasNoMoreSplits(subtaskId)) {

Review Comment:
   Does `NoMoreSplits` mean `subtask has been finished`? 
   
   I'm afraid they are not same. For example, some subtask doesn't have split 
when the parallelism of kafka source is greater than kafka partition.
   
   I'm not sure do we need this change. IIUC, inside of 
`context.sendEventToSourceOperatorIfTaskReady` has a check : do send event when 
`gateway` is not null. 
   
   It means coordinator doesn't send event to subtask when subtask is finished, 
right?
   
   Please correct me if anything is wrong, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to