[ 
https://issues.apache.org/jira/browse/FLINK-21436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288319#comment-17288319
 ] 

fanrui commented on FLINK-21436:
--------------------------------

After online verification:
The parallelism of Source is set to 2000, and Kafka's topic has 2000 
partitions. After UnionListState is merged, the state size is 97 KB.

> Speed ​​up the restore of UnionListState
> ----------------------------------------
>
>                 Key: FLINK-21436
>                 URL: https://issues.apache.org/jira/browse/FLINK-21436
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.13.0
>            Reporter: fanrui
>            Priority: Major
>
> h1. 1. Problem introduction and cause analysis
> Problem description: The duration of UnionListState restore under large 
> concurrency is more than 2 minutes.
> h2. the reason:
> 2000 subtasks write 2000 files during checkpoint, and each subtask needs to 
> read 2000 files during restore.
> 2000*2000 = 4 million, so 4 million small files need to be read to hdfs 
> during restore. HDFS has become a bottleneck, causing restore to be 
> particularly time-consuming.
> h1. 2. Optimize ideas
> Under normal circumstances, the UnionListState state is relatively small. 
> Typical usage scenario: Kafka offset information.
> When restoring, JM can directly read all 2000 small files, merge 
> UnionListState into a byte array and send it to all TMs to avoid frequent 
> access to hdfs by TMs.
> h1. 3. Benefits after optimization
> Before optimization: 2000 concurrent, Kafka offset restore takes 90~130 s.
> After optimization: 2000 concurrent, Kafka offset restore takes less than 1s.
> h1.  4. Risk points
> Too big UnionListState leads to too much pressure on JM.
> Solution 1:
> Add configuration and decide whether to enable this feature. The default is 
> false, which means the old plan is used. When the user is set to true, JM 
> will merge.
> Solution 2:
> The above configuration is not required, which is equivalent to being enabled 
> by default. However, JM detects the size of the state before merge, and does 
> not merge if it exceeds the threshold. The user can control the threshold 
> size.
> Note: Most of the scenarios where Flink uses UnionListState are Kafka offset 
> (small state). In theory, most jobs are risk-free.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to