mas-chen commented on code in PR #22925:
URL: https://github.com/apache/flink/pull/22925#discussion_r1255025231


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java:
##########
@@ -46,11 +45,14 @@ public interface TimestampsAndWatermarks<T> {
     /** Lets the owner/creator of the output know about latest emitted 
watermark. */
     @Internal
     interface WatermarkUpdateListener {
+
+        /** It should be called once the idle is changed. */
+        void updateIdle(boolean isIdle);

Review Comment:
   I wonder if we need this API. Can't we tell by 
#updateCurrentEffectiveWatermark() already if the source is idle, e.g. checking 
if it is a MAX_WATERMARK?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -146,7 +146,9 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     private DataOutput<OUT> lastInvokedOutput;
 
-    private long lastEmittedWatermark = Watermark.UNINITIALIZED.getTimestamp();
+    private long latestWatermark = Watermark.UNINITIALIZED.getTimestamp();
+
+    private boolean isIdle = false;

Review Comment:
   In terms of convention, I agree with @RocMarshal . I think (2) by IDEA is 
intended it is `is` because it is a boolean. The thinking is "is it?" or "is it 
not?"--because boolean is binary



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -181,9 +181,10 @@ void announceCombinedWatermark() {
 
         Set<Integer> subTaskIds = combinedWatermark.keySet();
         LOG.info(
-                "Distributing maxAllowedWatermark={} to subTaskIds={}",
+                "Distributing maxAllowedWatermark={} to subTaskIds={} for 
source {}.",
                 maxAllowedWatermark,
-                subTaskIds);
+                subTaskIds,
+                operatorName);

Review Comment:
   Is possible to also to include the alignment group names? Would be useful 
for debugging in the case of multiple groups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to