[ 
https://issues.apache.org/jira/browse/FLINK-21436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17299267#comment-17299267
 ] 

fanrui commented on FLINK-21436:
--------------------------------

[~yunta] Thanks for your comments. 

I have read FLINK-18203 and the mailing list in detail.

I have some questions about FLINK-18203: It just reduces the number of 
`OperatorStreamStateHandle` objects in the heap memory, but does not reduce the 
akka serialization overhead. It still needs to serialize n * m 
`ByteStreamStateHandles`. Curious, is serialization really not a bottleneck? We 
can see on the flame graph that serialization also consumes a lot of CPU. I 
hope FLINK-18203 can continue to move forward. After FLINK-18203 is completed, 
we can see whether the restore duration has been greatly improved.

Regarding the mailing list, I feel that removing UnionListState may take a long 
time. Before removal, I hope the community will pay attention to the restore 
duration of UnionListState. After all, the current Flink version restore is 
very slow, and it takes more than 2 minutes for parallelism=2K.

> Speed ​​up the restore of UnionListState
> ----------------------------------------
>
>                 Key: FLINK-21436
>                 URL: https://issues.apache.org/jira/browse/FLINK-21436
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.13.0
>            Reporter: fanrui
>            Priority: Major
>         Attachments: JM 启动火焰图.svg, akka timeout Exception.png
>
>
> h1. 1. Problem introduction and cause analysis
> Problem description: The duration of UnionListState restore under large 
> concurrency is more than 2 minutes.
> h2. the reason:
> 2000 subtasks write 2000 files during checkpoint, and each subtask needs to 
> read 2000 files during restore.
>  2000*2000 = 4 million, so 4 million small files need to be read to hdfs 
> during restore. HDFS has become a bottleneck, causing restore to be 
> particularly time-consuming.
> h1. 2. Optimize ideas
> Under normal circumstances, the UnionListState state is relatively small. 
> Typical usage scenario: Kafka offset information.
>  When restoring, JM can directly read all 2000 small files, merge 
> UnionListState into a byte array and send it to all TMs to avoid frequent 
> access to hdfs by TMs.
> h1. 3. Benefits after optimization
> Before optimization: 2000 concurrent, Kafka offset restore takes 90~130 s.
>  After optimization: 2000 concurrent, Kafka offset restore takes less than 1s.
> h1.  4. Risk points
> Too big UnionListState leads to too much pressure on JM.
> Solution 1:
>  Add configuration and decide whether to enable this feature. The default is 
> false, which means the old plan is used. When the user is set to true, JM 
> will merge.
> Solution 2:
> The above configuration is not required, which is equivalent to enabling 
> merge by default.
> JM detects the size of the state before merge, and if it is less than the 
> threshold, the state is considered to be relatively small, and the state is 
> sent to all TMs through ByteStreamStateHandle.
> If the threshold is exceeded, the state is considered to be greater. At this 
> time, write an hdfs file, and send FileStateHandle to all TMs, and TM can 
> read this file.
>  
> Note: Most of the scenarios where Flink uses UnionListState are Kafka offset 
> (small state). In theory, most jobs are risk-free.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to