[
https://issues.apache.org/jira/browse/FLINK-32420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Fan updated FLINK-32420:
----------------------------
Parent: FLINK-32524
Issue Type: Sub-task (was: Improvement)
> Watermark aggregation performance is poor when watermark alignment is enabled
> and parallelism is high
> -----------------------------------------------------------------------------------------------------
>
> Key: FLINK-32420
> URL: https://issues.apache.org/jira/browse/FLINK-32420
> Project: Flink
> Issue Type: Sub-task
> Components: Connectors / Common
> Affects Versions: 1.17.1
> Reporter: Rui Fan
> Assignee: Rui Fan
> Priority: Major
> Labels: pull-request-available
>
> The
> [SourceCoordinator.WatermarkAggregator#aggregate|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644]
> method will find the smallest watermark of all keys as the
> aggregatedWatermark.
> However, the time complexity of the aggregate method in a WatermarkAlignment
> updateInterval cycle is O(n*n),because:
> * Every subtask report a latest watermark to SourceCoordinator in a
> WatermarkAlignment updateInterval cycle
> * SourceCoordinator updates the smallest watermark from all subtasks for
> each reporting
> In general, the key is subtaskIndex, so the number of key is parallelism.
> When the parallelism is high, the watermark aggregation performance will be
> poor.
> h1. Performance Test:
> The parallelism is 10000, each subtask reports 20 watermarks, and the
> aggregate method takes 18.921s. Almost every round takes 950 ms.
> * If the watermarkAlignment updateInterval is 1s, SourceCoordinator will be
> very busy.
> * If it's less than 1s, the Watermark aggregation will be delayed
> I have finished the POC for performance improvement, and reduced Watermark
> aggregation time per watermarkAlignment updateInterval cycle from 950 ms to 6
> ms.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)