[ 
https://issues.apache.org/jira/browse/FLINK-21436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288821#comment-17288821
 ] 

fanrui commented on FLINK-21436:
--------------------------------

[~yunta] Thanks for your comments.

I tried to increase the value of `state.backend.fs.memory-threshold` and found 
that the effect was not good. JM is under great pressure when starting the job.

JM needs to allocate 2000 ByteStreamStateHandle to each subtask. JM's CPU and 
GC are extremely stressful.

Merging StreamStateHandle can save the state size and the number of objects, 
and reduce the serialization overhead of JM.
E.g:
In the Kafka offset scenario, there are 2000 ByteStreamStateHandles, and each 
ByteStreamStateHandle is about 2100 bytes, of which 2056 bytes are metadata and 
44 bytes are real state data. The total state size is 2100 byte * 2000 ≈ 4M.

The reason that merging StreamStateHandle can save the state size is that only 
one metadata of 2056 byte needs to be stored, and 2000 state data (44byte) is 
stored. The total state size: 2056byte + 44byte*2000 is approximately equal to 
90KB. There is only one ByteStreamStateHandle object. The number of objects 
that JM needs to serialize is reduced from O(n*n) to O(n).

Both the state size and the number of objects have been drastically reduced.

 

> Speed ​​up the restore of UnionListState
> ----------------------------------------
>
>                 Key: FLINK-21436
>                 URL: https://issues.apache.org/jira/browse/FLINK-21436
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.13.0
>            Reporter: fanrui
>            Priority: Major
>
> h1. 1. Problem introduction and cause analysis
> Problem description: The duration of UnionListState restore under large 
> concurrency is more than 2 minutes.
> h2. the reason:
> 2000 subtasks write 2000 files during checkpoint, and each subtask needs to 
> read 2000 files during restore.
>  2000*2000 = 4 million, so 4 million small files need to be read to hdfs 
> during restore. HDFS has become a bottleneck, causing restore to be 
> particularly time-consuming.
> h1. 2. Optimize ideas
> Under normal circumstances, the UnionListState state is relatively small. 
> Typical usage scenario: Kafka offset information.
>  When restoring, JM can directly read all 2000 small files, merge 
> UnionListState into a byte array and send it to all TMs to avoid frequent 
> access to hdfs by TMs.
> h1. 3. Benefits after optimization
> Before optimization: 2000 concurrent, Kafka offset restore takes 90~130 s.
>  After optimization: 2000 concurrent, Kafka offset restore takes less than 1s.
> h1.  4. Risk points
> Too big UnionListState leads to too much pressure on JM.
> Solution 1:
>  Add configuration and decide whether to enable this feature. The default is 
> false, which means the old plan is used. When the user is set to true, JM 
> will merge.
> Solution 2:
> The above configuration is not required, which is equivalent to enabling 
> merge by default.
> JM detects the size of the state before merge, and if it is less than the 
> threshold, the state is considered to be relatively small, and the state is 
> sent to all TMs through ByteStreamStateHandle.
> If the threshold is exceeded, the state is considered to be greater. At this 
> time, write an hdfs file, and send FileStateHandle to all TMs, and TM can 
> read this file.
>  
> Note: Most of the scenarios where Flink uses UnionListState are Kafka offset 
> (small state). In theory, most jobs are risk-free.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to