StefanRRichter commented on code in PR #22852:
URL: https://github.com/apache/flink/pull/22852#discussion_r1245250027
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -642,7 +643,31 @@ private static class WatermarkAggregator<T> {
* Optional.empty()} otherwise.
*/
public Optional<Watermark> aggregate(T key, Watermark watermark) {
- watermarks.put(key, watermark);
+ Watermark oldWatermark = watermarks.put(key, watermark);
+ // Step (1): Update the latest watermark of current key as the
aggregatedWatermark
+ // directly if it is less than the aggregatedWatermark.
+ if (watermark.getTimestamp() < aggregatedWatermark.getTimestamp())
{
+ aggregatedWatermark = watermark;
+ return Optional.of(aggregatedWatermark);
+ }
+
+ // Step(2): The aggWM won't change when these conditions are met,
so return directly:
+ // case1. The latest WM of the current key isn't changed
+ // case2. When oldWatermark isn't null and is greater than aggWm,
it means that aggWm
+ // comes from other keys. If new WM is greater than or equal to
aggWm, then aggWm must
+ // not change.
+ // case3. When oldWatermark is null and {@link watermarks} has
other keys, it means that
+ // aggWm comes from other keys. If new WM is greater than or equal
to aggWm, then aggWm
+ // must not change.
+ // Note: Here's an implicit condition `watermark >=
aggregatedWatermark` due to step(1),
+ // that's why it's not written in the code of case2 and case3.
+ if (watermark.equals(oldWatermark)
+ || (oldWatermark != null
+ && oldWatermark.getTimestamp() >
aggregatedWatermark.getTimestamp())
+ || (oldWatermark == null && watermarks.size() > 1)) {
+ return Optional.empty();
+ }
+
Watermark newMinimum =
Review Comment:
Just in case that you find this step still takes too much time, I once had
to solve a similar problem for the timer service and you could take a look at
reusing `HeapPriorityQueue` and wrap the watermark value as
`HeapPriorityQueueElement` so that they can be managed by the priority queue.
This will allow you to always find the new min watermark very quickly and at
the same time keep removing watermarks from the heap reasonable fast.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]