1996fanrui commented on code in PR #22852:
URL: https://github.com/apache/flink/pull/22852#discussion_r1244697163
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -631,8 +631,9 @@ private void ensureStarted() {
}
}
- private static class WatermarkAggregator<T> {
+ static class WatermarkAggregator<T> {
private final Map<T, Watermark> watermarks = new HashMap<>();
+ /** The aggregatedWatermark is the smallest watermark of all keys. */
Review Comment:
> I was thinking that there's also further optimization on the data
structure. The WatermarkAggregator only needs to return the min watermark and
the set of tracked subtasks. So you could also use a TreeMap.
TreeMap orders all elements based on the key instead of value.
WatermarkAggregator wants to find the min watermark, it's the value. So I don't
know why `TreeMap` is suitable here.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -631,8 +631,9 @@ private void ensureStarted() {
}
}
- private static class WatermarkAggregator<T> {
+ static class WatermarkAggregator<T> {
private final Map<T, Watermark> watermarks = new HashMap<>();
+ /** The aggregatedWatermark is the smallest watermark of all keys. */
Review Comment:
> nit: I think the style is to use single line comments //. The multiline is
reserved for signatures, classes, etc.
I see the `/** */` is used at field level in the too many flink code, such
as :
[PipelinedSubpartition](https://github.com/apache/flink/blob/e35f33f539b09b2c18fb826a144c08541e148dd0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java#L87)
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -642,7 +643,31 @@ private static class WatermarkAggregator<T> {
* Optional.empty()} otherwise.
*/
public Optional<Watermark> aggregate(T key, Watermark watermark) {
- watermarks.put(key, watermark);
+ Watermark oldWatermark = watermarks.put(key, watermark);
+ // Step (1): Update the latest watermark of current key as the
aggregatedWatermark
+ // directly if it is less than the aggregatedWatermark.
+ if (watermark.getTimestamp() < aggregatedWatermark.getTimestamp())
{
+ aggregatedWatermark = watermark;
+ return Optional.of(aggregatedWatermark);
+ }
+
+ // Step(2): The aggWM won't change when these conditions are met,
so return directly:
+ // case1. The latest WM of the current key isn't changed
+ // case2. When oldWatermark isn't null and is greater than aggWm,
it means that aggWm
+ // comes from other keys. If new WM is greater than or equal to
aggWm, then aggWm must
+ // not change.
+ // case3. When oldWatermark is null and {@link watermarks} has
other keys, it means that
+ // aggWm comes from other keys. If new WM is greater than or equal
to aggWm, then aggWm
+ // must not change.
+ // Note: Here's an implicit condition `watermark >=
aggregatedWatermark` due to step(1),
+ // that's why it's not written in the code of case2 and case3.
+ if (watermark.equals(oldWatermark)
+ || (oldWatermark != null
+ && oldWatermark.getTimestamp() >
aggregatedWatermark.getTimestamp())
+ || (oldWatermark == null && watermarks.size() > 1)) {
+ return Optional.empty();
+ }
+
Watermark newMinimum =
Review Comment:
```
Watermark newMinimum =
watermarks.values().stream()
.min(Comparator.comparingLong(Watermark::getTimestamp))
.orElseThrow(IllegalStateException::new);
```
This code is a general solution to finding the min watermark, and step1 and
step2 are optimization for the general solution.
When the step1 and step2 is not met, we still need to find the newMinimum
from `watermarks`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -642,7 +643,31 @@ private static class WatermarkAggregator<T> {
* Optional.empty()} otherwise.
*/
public Optional<Watermark> aggregate(T key, Watermark watermark) {
- watermarks.put(key, watermark);
+ Watermark oldWatermark = watermarks.put(key, watermark);
+ // Step (1): Update the latest watermark of current key as the
aggregatedWatermark
+ // directly if it is less than the aggregatedWatermark.
+ if (watermark.getTimestamp() < aggregatedWatermark.getTimestamp())
{
+ aggregatedWatermark = watermark;
+ return Optional.of(aggregatedWatermark);
+ }
+
+ // Step(2): The aggWM won't change when these conditions are met,
so return directly:
Review Comment:
The `oldWatermark` is null when the subtask report the watermark for the
first time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]