pnowojski commented on code in PR #22852:
URL: https://github.com/apache/flink/pull/22852#discussion_r1245251239


##########
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##########
@@ -131,6 +138,101 @@ void testWatermarkAlignmentWithTwoGroups() throws 
Exception {
         }
     }
 
+    @Test
+    @Timeout(5)
+    void testWatermarkAggregatorBenchmark() {

Review Comment:
   👍 for benchmarks, but not here. If you pick any reasonable timeout here, it 
will be a flaky test, as CI infra can be extremely slow. It would be much 
better to add it in one way or another to the flink-benchmarks repo.
   
   I could see this benchmarked in one of two ways:
   
   1. Using only public API via ITCase style benchmark, for example adjusting 
`mapSink.F27_UNBOUNDED` to run more subtasks with smallest possible watermark 
update interval. But it might be hard to achieve large enough parallelism on a 
single machine with this kind of test.
   2. Using internal Flink classes via a unit test style benchmark (just like 
yours here). But in this case, we can not use internal Flink classes in 
`flink-benchmarks`. The workaround is to create benchmark class in `flink` repo 
ala `StreamNetworkThroughputBenchmark` and then use that class in 
`flink-benchmarks` repo like `StreamNetworkThroughputBenchmarkExecutor`.
   
   In the 2. option, it would be actually nice to maybe benchmark not just the 
`#aggregate` function, but the whole code path of processing alignment events 
by the JM.
   
   Also ideally, it would be great to first merge the benchmarks in the 
separate PR, and later this PR improving the performance. This way we could 
verify the performance improvement in http://codespeed.dak8s.net:8000/



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -631,8 +631,9 @@ private void ensureStarted() {
         }
     }
 
-    private static class WatermarkAggregator<T> {
+    static class WatermarkAggregator<T> {
         private final Map<T, Watermark> watermarks = new HashMap<>();
+        /** The aggregatedWatermark is the smallest watermark of all keys. */

Review Comment:
   But the general idea of using some ordered data structure should be still 
applicable here, right?
   
   We could use the already existing `Map watermarks` just as it is.  And on 
top of that have an ordered structure of watermarks called `fooBar`. Now per 
each `aggregate` invocation
   
   1. we are using `watermarks` map to know what was the old value for the 
given key
   2. remove the old value from `fooBar`
   3. add new watermark value to `fooBar`
   
   Giving us `O(n*log n)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to