[
https://issues.apache.org/jira/browse/FLINK-21436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
fanrui closed FLINK-21436.
--------------------------
Resolution: Invalid
> Speed up the restore of UnionListState
> ----------------------------------------
>
> Key: FLINK-21436
> URL: https://issues.apache.org/jira/browse/FLINK-21436
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / State Backends
> Affects Versions: 1.13.0
> Reporter: fanrui
> Priority: Minor
> Labels: auto-deprioritized-major
> Attachments: JM 启动火焰图.svg, akka timeout Exception.png
>
>
> h1. 1. Problem introduction and cause analysis
> Problem description: The duration of UnionListState restore under large
> concurrency is more than 2 minutes.
> h2. the reason:
> 2000 subtasks write 2000 files during checkpoint, and each subtask needs to
> read 2000 files during restore.
> 2000*2000 = 4 million, so 4 million small files need to be read to hdfs
> during restore. HDFS has become a bottleneck, causing restore to be
> particularly time-consuming.
> h1. 2. Optimize ideas
> Under normal circumstances, the UnionListState state is relatively small.
> Typical usage scenario: Kafka offset information.
> When restoring, JM can directly read all 2000 small files, merge
> UnionListState into a byte array and send it to all TMs to avoid frequent
> access to hdfs by TMs.
> h1. 3. Benefits after optimization
> Before optimization: 2000 concurrent, Kafka offset restore takes 90~130 s.
> After optimization: 2000 concurrent, Kafka offset restore takes less than 1s.
> h1. 4. Risk points
> Too big UnionListState leads to too much pressure on JM.
> Solution 1:
> Add configuration and decide whether to enable this feature. The default is
> false, which means the old plan is used. When the user is set to true, JM
> will merge.
> Solution 2:
> The above configuration is not required, which is equivalent to enabling
> merge by default.
> JM detects the size of the state before merge, and if it is less than the
> threshold, the state is considered to be relatively small, and the state is
> sent to all TMs through ByteStreamStateHandle.
> If the threshold is exceeded, the state is considered to be greater. At this
> time, write an hdfs file, and send FileStateHandle to all TMs, and TM can
> read this file.
>
> Note: Most of the scenarios where Flink uses UnionListState are Kafka offset
> (small state). In theory, most jobs are risk-free.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)