1996fanrui commented on code in PR #22925:
URL: https://github.com/apache/flink/pull/22925#discussion_r1255220217


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java:
##########
@@ -46,11 +45,14 @@ public interface TimestampsAndWatermarks<T> {
     /** Lets the owner/creator of the output know about latest emitted 
watermark. */
     @Internal
     interface WatermarkUpdateListener {
+
+        /** It should be called once the idle is changed. */
+        void updateIdle(boolean isIdle);

Review Comment:
   Actually, I have the same question during developing.
   
   I add the method based on 2 reasons:
   
   - The idle is related to watermark, so it's reasonable to add it.
   - We can check if it's idle based on `MAX_WATERMARK`, however it's too 
complex. 
       - We send `MAX_WATERMARK` as the idle at the caller side
       - And check `MAX_WATERMARK` as the idle at the callee side
   
   So I think adding the method is more clear.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -146,7 +146,9 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     private DataOutput<OUT> lastInvokedOutput;
 
-    private long lastEmittedWatermark = Watermark.UNINITIALIZED.getTimestamp();
+    private long latestWatermark = Watermark.UNINITIALIZED.getTimestamp();
+
+    private boolean isIdle = false;

Review Comment:
   Sounds make sense, updated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -181,9 +181,10 @@ void announceCombinedWatermark() {
 
         Set<Integer> subTaskIds = combinedWatermark.keySet();
         LOG.info(
-                "Distributing maxAllowedWatermark={} to subTaskIds={}",
+                "Distributing maxAllowedWatermark={} to subTaskIds={} for 
source {}.",
                 maxAllowedWatermark,
-                subTaskIds);
+                subTaskIds,
+                operatorName);

Review Comment:
   Good suggestion, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to