Rui Fan created FLINK-32420:
-------------------------------
Summary: Watermark aggregation performance is poor when watermark
alignment is enabled and parallelism is high
Key: FLINK-32420
URL: https://issues.apache.org/jira/browse/FLINK-32420
Project: Flink
Issue Type: Improvement
Components: Connectors / Common
Affects Versions: 1.17.1
Reporter: Rui Fan
Assignee: Rui Fan
The
[SourceCoordinator.WatermarkAggregator#aggregate|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644]
method will find the smallest watermark of all keys as the
aggregatedWatermark.
However, the time complexity of the aggregate method in a WatermarkAlignment
updateInterval cycle is O(n*n),because:
* Every subtask report a latest watermark to SourceCoordinator in a
WatermarkAlignment updateInterval cycle
* SourceCoordinator updates the smallest watermark from all subtasks for each
reporting
In general, the key is subtaskIndex, so the number of key is parallelism. When
the parallelism is high, the watermark aggregation performance will be poor.
h1. Performance Test:
The parallelism is 10000, each subtask reports 20 watermarks, and the aggregate
method takes 18.921s. Almost every round takes 950 ms.
* If the watermarkAlignment updateInterval is 1s, SourceCoordinator will be
very busy.
* If it's less than 1s, the Watermark aggregation will be delayed
I have finished the POC for performance improvement, and reduced Watermark
aggregation time per watermarkAlignment updateInterval cycle from 950 ms to 12
ms.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)