[ 
https://issues.apache.org/jira/browse/FLINK-32420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-32420:
----------------------------
    Description: 
The 
[SourceCoordinator.WatermarkAggregator#aggregate|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644]
 method will find the smallest watermark of all keys as the  
aggregatedWatermark.

However, the time complexity of the aggregate method in a WatermarkAlignment 
updateInterval cycle is O(n*n),because:
 * Every subtask report a latest watermark to SourceCoordinator in a 
WatermarkAlignment updateInterval cycle
 * SourceCoordinator updates the smallest watermark from all subtasks for each 
reporting

In general, the key is subtaskIndex, so the number of key is parallelism. When 
the parallelism is high, the watermark aggregation performance  will be poor.
h1. Performance Test:

The parallelism is 10000, each subtask reports 20 watermarks, and the aggregate 
method takes 18.921s. Almost every round takes 950 ms.
 * If the watermarkAlignment updateInterval is 1s, SourceCoordinator will be 
very busy.
 * If it's less than 1s, the Watermark aggregation will be delayed

I have finished the POC for performance improvement, and reduced Watermark 
aggregation time per watermarkAlignment updateInterval cycle from 950 ms to 6 
ms.

  was:
The 
[SourceCoordinator.WatermarkAggregator#aggregate|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644]
 method will find the smallest watermark of all keys as the  
aggregatedWatermark.

However, the time complexity of the aggregate method in a WatermarkAlignment 
updateInterval cycle is O(n*n),because:
 * Every subtask report a latest watermark to SourceCoordinator in a 
WatermarkAlignment updateInterval cycle
 * SourceCoordinator updates the smallest watermark from all subtasks for each 
reporting

In general, the key is subtaskIndex, so the number of key is parallelism. When 
the parallelism is high, the watermark aggregation performance  will be poor.
h1. Performance Test:

The parallelism is 10000, each subtask reports 20 watermarks, and the aggregate 
method takes 18.921s. Almost every round takes 950 ms.
 * If the watermarkAlignment updateInterval is 1s, SourceCoordinator will be 
very busy.
 * If it's less than 1s, the Watermark aggregation will be delayed

I have finished the POC for performance improvement, and reduced Watermark 
aggregation time per watermarkAlignment updateInterval cycle from 950 ms to 12 
ms.


> Watermark aggregation performance is poor when watermark alignment is enabled 
> and parallelism is high
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32420
>                 URL: https://issues.apache.org/jira/browse/FLINK-32420
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Common
>    Affects Versions: 1.17.1
>            Reporter: Rui Fan
>            Assignee: Rui Fan
>            Priority: Major
>
> The 
> [SourceCoordinator.WatermarkAggregator#aggregate|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644]
>  method will find the smallest watermark of all keys as the  
> aggregatedWatermark.
> However, the time complexity of the aggregate method in a WatermarkAlignment 
> updateInterval cycle is O(n*n),because:
>  * Every subtask report a latest watermark to SourceCoordinator in a 
> WatermarkAlignment updateInterval cycle
>  * SourceCoordinator updates the smallest watermark from all subtasks for 
> each reporting
> In general, the key is subtaskIndex, so the number of key is parallelism. 
> When the parallelism is high, the watermark aggregation performance  will be 
> poor.
> h1. Performance Test:
> The parallelism is 10000, each subtask reports 20 watermarks, and the 
> aggregate method takes 18.921s. Almost every round takes 950 ms.
>  * If the watermarkAlignment updateInterval is 1s, SourceCoordinator will be 
> very busy.
>  * If it's less than 1s, the Watermark aggregation will be delayed
> I have finished the POC for performance improvement, and reduced Watermark 
> aggregation time per watermarkAlignment updateInterval cycle from 950 ms to 6 
> ms.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to