1996fanrui commented on code in PR #22852:
URL: https://github.com/apache/flink/pull/22852#discussion_r1244697163


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -631,8 +631,9 @@ private void ensureStarted() {
         }
     }
 
-    private static class WatermarkAggregator<T> {
+    static class WatermarkAggregator<T> {
         private final Map<T, Watermark> watermarks = new HashMap<>();
+        /** The aggregatedWatermark is the smallest watermark of all keys. */

Review Comment:
   > I was thinking that there's also further optimization on the data 
structure. The WatermarkAggregator only needs to return the min watermark and 
the set of tracked subtasks. So you could also use a TreeMap.
   
   TreeMap orders all elements based on the key instead of value. 
WatermarkAggregator wants to find the min watermark, it's the value. So I don't 
know why `TreeMap` is suitable here.
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -631,8 +631,9 @@ private void ensureStarted() {
         }
     }
 
-    private static class WatermarkAggregator<T> {
+    static class WatermarkAggregator<T> {
         private final Map<T, Watermark> watermarks = new HashMap<>();
+        /** The aggregatedWatermark is the smallest watermark of all keys. */

Review Comment:
   > nit: I think the style is to use single line comments //. The multiline is 
reserved for signatures, classes, etc.
   
   I see the `/** */` is used at field level in the too many flink code, such 
as : 
[PipelinedSubpartition](https://github.com/apache/flink/blob/e35f33f539b09b2c18fb826a144c08541e148dd0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java#L87)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -642,7 +643,31 @@ private static class WatermarkAggregator<T> {
          *     Optional.empty()} otherwise.
          */
         public Optional<Watermark> aggregate(T key, Watermark watermark) {
-            watermarks.put(key, watermark);
+            Watermark oldWatermark = watermarks.put(key, watermark);
+            // Step (1): Update the latest watermark of current key as the 
aggregatedWatermark
+            // directly if it is less than the aggregatedWatermark.
+            if (watermark.getTimestamp() < aggregatedWatermark.getTimestamp()) 
{
+                aggregatedWatermark = watermark;
+                return Optional.of(aggregatedWatermark);
+            }
+
+            // Step(2): The aggWM won't change when these conditions are met, 
so return directly:
+            // case1. The latest WM of the current key isn't changed
+            // case2. When oldWatermark isn't null and is greater than aggWm, 
it means that aggWm
+            // comes from other keys. If new WM is greater than or equal to 
aggWm, then aggWm must
+            // not change.
+            // case3. When oldWatermark is null and {@link watermarks} has 
other keys, it means that
+            // aggWm comes from other keys. If new WM is greater than or equal 
to aggWm, then aggWm
+            // must not change.
+            // Note: Here's an implicit condition `watermark >= 
aggregatedWatermark` due to step(1),
+            // that's why it's not written in the code of case2 and case3.
+            if (watermark.equals(oldWatermark)
+                    || (oldWatermark != null
+                            && oldWatermark.getTimestamp() > 
aggregatedWatermark.getTimestamp())
+                    || (oldWatermark == null && watermarks.size() > 1)) {
+                return Optional.empty();
+            }
+
             Watermark newMinimum =

Review Comment:
   ```
               Watermark newMinimum =
                       watermarks.values().stream()
                               
.min(Comparator.comparingLong(Watermark::getTimestamp))
                               .orElseThrow(IllegalStateException::new);
   ```
   
   This code is a general solution to finding the min watermark, and step1 and 
step2 are optimization for the general solution.
   
   When the step1 and step2 is not met, we still need to find the newMinimum 
from `watermarks`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -642,7 +643,31 @@ private static class WatermarkAggregator<T> {
          *     Optional.empty()} otherwise.
          */
         public Optional<Watermark> aggregate(T key, Watermark watermark) {
-            watermarks.put(key, watermark);
+            Watermark oldWatermark = watermarks.put(key, watermark);
+            // Step (1): Update the latest watermark of current key as the 
aggregatedWatermark
+            // directly if it is less than the aggregatedWatermark.
+            if (watermark.getTimestamp() < aggregatedWatermark.getTimestamp()) 
{
+                aggregatedWatermark = watermark;
+                return Optional.of(aggregatedWatermark);
+            }
+
+            // Step(2): The aggWM won't change when these conditions are met, 
so return directly:

Review Comment:
   The `oldWatermark` is null when the subtask report the watermark for the 
first time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to