[
https://issues.apache.org/jira/browse/FLINK-21436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288807#comment-17288807
]
Yun Tang commented on FLINK-21436:
----------------------------------
Since the {{state.backend.fs.memory-threshold}} has been increased to 20k since
FLINK-17865, I wonder why there are so many files to read as operator union
list state should be really small in general.
I also think current optimization looks pretty hacking by merging them within
JM.
>From what I know, union list state is under discussion for optimization or
>even deprecated, maybe [~guoweima] could share more related info.
> Speed up the restore of UnionListState
> ----------------------------------------
>
> Key: FLINK-21436
> URL: https://issues.apache.org/jira/browse/FLINK-21436
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing, Runtime / State Backends
> Affects Versions: 1.13.0
> Reporter: fanrui
> Priority: Major
>
> h1. 1. Problem introduction and cause analysis
> Problem description: The duration of UnionListState restore under large
> concurrency is more than 2 minutes.
> h2. the reason:
> 2000 subtasks write 2000 files during checkpoint, and each subtask needs to
> read 2000 files during restore.
> 2000*2000 = 4 million, so 4 million small files need to be read to hdfs
> during restore. HDFS has become a bottleneck, causing restore to be
> particularly time-consuming.
> h1. 2. Optimize ideas
> Under normal circumstances, the UnionListState state is relatively small.
> Typical usage scenario: Kafka offset information.
> When restoring, JM can directly read all 2000 small files, merge
> UnionListState into a byte array and send it to all TMs to avoid frequent
> access to hdfs by TMs.
> h1. 3. Benefits after optimization
> Before optimization: 2000 concurrent, Kafka offset restore takes 90~130 s.
> After optimization: 2000 concurrent, Kafka offset restore takes less than 1s.
> h1. 4. Risk points
> Too big UnionListState leads to too much pressure on JM.
> Solution 1:
> Add configuration and decide whether to enable this feature. The default is
> false, which means the old plan is used. When the user is set to true, JM
> will merge.
> Solution 2:
> The above configuration is not required, which is equivalent to enabling
> merge by default.
> JM detects the size of the state before merge, and if it is less than the
> threshold, the state is considered to be relatively small, and the state is
> sent to all TMs through ByteStreamStateHandle.
> If the threshold is exceeded, the state is considered to be greater. At this
> time, write an hdfs file, and send FileStateHandle to all TMs, and TM can
> read this file.
>
> Note: Most of the scenarios where Flink uses UnionListState are Kafka offset
> (small state). In theory, most jobs are risk-free.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)