StefanRRichter commented on code in PR #22852:
URL: https://github.com/apache/flink/pull/22852#discussion_r1245250027


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -642,7 +643,31 @@ private static class WatermarkAggregator<T> {
          *     Optional.empty()} otherwise.
          */
         public Optional<Watermark> aggregate(T key, Watermark watermark) {
-            watermarks.put(key, watermark);
+            Watermark oldWatermark = watermarks.put(key, watermark);
+            // Step (1): Update the latest watermark of current key as the 
aggregatedWatermark
+            // directly if it is less than the aggregatedWatermark.
+            if (watermark.getTimestamp() < aggregatedWatermark.getTimestamp()) 
{
+                aggregatedWatermark = watermark;
+                return Optional.of(aggregatedWatermark);
+            }
+
+            // Step(2): The aggWM won't change when these conditions are met, 
so return directly:
+            // case1. The latest WM of the current key isn't changed
+            // case2. When oldWatermark isn't null and is greater than aggWm, 
it means that aggWm
+            // comes from other keys. If new WM is greater than or equal to 
aggWm, then aggWm must
+            // not change.
+            // case3. When oldWatermark is null and {@link watermarks} has 
other keys, it means that
+            // aggWm comes from other keys. If new WM is greater than or equal 
to aggWm, then aggWm
+            // must not change.
+            // Note: Here's an implicit condition `watermark >= 
aggregatedWatermark` due to step(1),
+            // that's why it's not written in the code of case2 and case3.
+            if (watermark.equals(oldWatermark)
+                    || (oldWatermark != null
+                            && oldWatermark.getTimestamp() > 
aggregatedWatermark.getTimestamp())
+                    || (oldWatermark == null && watermarks.size() > 1)) {
+                return Optional.empty();
+            }
+
             Watermark newMinimum =

Review Comment:
   Just in case that you find this step still takes too much time, I once had 
to solve a similar problem for the timer service and you could take a look at 
reusing `HeapPriorityQueue` and wrap the watermark value as 
`HeapPriorityQueueElement` so that they can be managed by the priority queue. 
This will allow you to always find the new min watermark very quickly O(1) and 
at the same time keep adding/removing watermarks to/from the heap reasonable 
fast O(log n).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to