[ https://issues.apache.org/jira/browse/FLINK-32420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742366#comment-17742366 ]
Rui Fan commented on FLINK-32420: --------------------------------- Merged via: 354a8852766b16873b5fad972e4440c1eaa4c40a (master: 1.18) > Watermark aggregation performance is poor when watermark alignment is enabled > and parallelism is high > ----------------------------------------------------------------------------------------------------- > > Key: FLINK-32420 > URL: https://issues.apache.org/jira/browse/FLINK-32420 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common > Affects Versions: 1.17.1 > Reporter: Rui Fan > Assignee: Rui Fan > Priority: Major > Labels: pull-request-available > > The > [SourceCoordinator.WatermarkAggregator#aggregate|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644] > method will find the smallest watermark of all keys as the > aggregatedWatermark. > However, the time complexity of the aggregate method in a WatermarkAlignment > updateInterval cycle is O(n*n),because: > * Every subtask report a latest watermark to SourceCoordinator in a > WatermarkAlignment updateInterval cycle > * SourceCoordinator updates the smallest watermark from all subtasks for > each reporting > In general, the key is subtaskIndex, so the number of key is parallelism. > When the parallelism is high, the watermark aggregation performance will be > poor. > h1. Performance Test: > The parallelism is 10000, each subtask reports 20 watermarks, and the > aggregate method takes 18.921s. Almost every round takes 950 ms. > * If the watermarkAlignment updateInterval is 1s, SourceCoordinator will be > very busy. > * If it's less than 1s, the Watermark aggregation will be delayed > I have finished the POC for performance improvement, and reduced Watermark > aggregation time per watermarkAlignment updateInterval cycle from 950 ms to 6 > ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)