pnowojski commented on code in PR #22852:
URL: https://github.com/apache/flink/pull/22852#discussion_r1245251239
##########
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##########
@@ -131,6 +138,101 @@ void testWatermarkAlignmentWithTwoGroups() throws
Exception {
}
}
+ @Test
+ @Timeout(5)
+ void testWatermarkAggregatorBenchmark() {
Review Comment:
👍 for benchmarks, but not here. If you pick any reasonable timeout here, it
will be a flaky test, as CI infra can be extremely slow. It would be much
better to add it in one way or another to the flink-benchmarks repo.
I could see this benchmarked in one of two ways:
1. Using only public API via ITCase style benchmark, for example adjusting
`mapSink.F27_UNBOUNDED` to run more subtasks with smallest possible watermark
update interval. But it might be hard to achieve large enough parallelism on a
single machine with this kind of test.
2. Using internal Flink classes via a unit test style benchmark (just like
yours here). But in this case, we can not use internal Flink classes in
`flink-benchmarks`. The workaround is to create benchmark class in `flink` repo
ala `StreamNetworkThroughputBenchmark` and then use that class in
`flink-benchmarks` repo like `StreamNetworkThroughputBenchmarkExecutor`.
In the 2. option, it would be actually nice to maybe benchmark not just the
`#aggregate` function, but the whole code path of processing alignment events
by the JM.
Also ideally, it would be great to first merge the benchmarks in the
separate PR, and later this PR improving the performance. This way we could
verify the performance improvement in http://codespeed.dak8s.net:8000/
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -631,8 +631,9 @@ private void ensureStarted() {
}
}
- private static class WatermarkAggregator<T> {
+ static class WatermarkAggregator<T> {
private final Map<T, Watermark> watermarks = new HashMap<>();
+ /** The aggregatedWatermark is the smallest watermark of all keys. */
Review Comment:
But the general idea of using some ordered data structure should be still
applicable here, right?
We could use the already existing `Map watermarks` just as it is. And on
top of that have an ordered structure of watermarks called `fooBar`. Now per
each `aggregate` invocation
1. we are using `watermarks` map to know what was the old value for the
given key
2. remove the old value from `fooBar`
3. add new watermark value to `fooBar`
Giving us `O(n*log n)`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]