elon-X commented on code in PR #24757:
URL: https://github.com/apache/flink/pull/24757#discussion_r1624644653


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -201,8 +201,11 @@ void announceCombinedWatermark() {
         // to ready task to avoid period task fail (Java-ThreadPoolExecutor 
will not schedule
         // the period task if it throws an exception).
         for (Integer subtaskId : subTaskIds) {
-            context.sendEventToSourceOperatorIfTaskReady(
-                    subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+            // when subtask have been finished, do not send event.
+            if (!context.hasNoMoreSplits(subtaskId)) {

Review Comment:
   @1996fanrui sorry for the late relay. I made some adjustments based on our 
discussion and added an ITCase. Could you please review it again? Thanks for 
your help.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to