Zakelly commented on code in PR #25792:
URL: https://github.com/apache/flink/pull/25792#discussion_r1883189632


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java:
##########
@@ -368,10 +385,12 @@ protected void processWatermarkStatus(WatermarkStatus 
watermarkStatus, int index
                     boolean wasIdle = combinedWatermark.isIdle();
                     // index is 0-based
                     if (combinedWatermark.updateStatus(index, 
watermarkStatus.isIdle())) {
-                        super.processWatermark(
-                                new 
Watermark(combinedWatermark.getCombinedWatermark()));
-                    }
-                    if (wasIdle != combinedWatermark.isIdle()) {
+                        doProcessWatermark(

Review Comment:
   This part is basically a rewrite of the overriden method, 
`AbstractStreamOperator#emitWatermarkStatus`, considering the async timer 
processing and emitting watermark or status after that. The logic in the method 
of parent class is more readable. Which is:
   
   -  If a combined watermark changes (because some of the watermarks become 
active), we process the combined watermark.
   - If the combined watermark idle/active status switched, notify the 
downstream.
   
   These two if(s) are independent.
   
   This PR ensures watermark status emitting after the watermark processing 
under the async context. I don't think a comment or two would explain this 
logic well, but you can get the point if you compare this method with the one 
from parent class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to