[ https://issues.apache.org/jira/browse/FLINK-21436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288319#comment-17288319 ]
fanrui commented on FLINK-21436: -------------------------------- After online verification: The parallelism of Source is set to 2000, and Kafka's topic has 2000 partitions. After UnionListState is merged, the state size is 97 KB. > Speed up the restore of UnionListState > ---------------------------------------- > > Key: FLINK-21436 > URL: https://issues.apache.org/jira/browse/FLINK-21436 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends > Affects Versions: 1.13.0 > Reporter: fanrui > Priority: Major > > h1. 1. Problem introduction and cause analysis > Problem description: The duration of UnionListState restore under large > concurrency is more than 2 minutes. > h2. the reason: > 2000 subtasks write 2000 files during checkpoint, and each subtask needs to > read 2000 files during restore. > 2000*2000 = 4 million, so 4 million small files need to be read to hdfs > during restore. HDFS has become a bottleneck, causing restore to be > particularly time-consuming. > h1. 2. Optimize ideas > Under normal circumstances, the UnionListState state is relatively small. > Typical usage scenario: Kafka offset information. > When restoring, JM can directly read all 2000 small files, merge > UnionListState into a byte array and send it to all TMs to avoid frequent > access to hdfs by TMs. > h1. 3. Benefits after optimization > Before optimization: 2000 concurrent, Kafka offset restore takes 90~130 s. > After optimization: 2000 concurrent, Kafka offset restore takes less than 1s. > h1. 4. Risk points > Too big UnionListState leads to too much pressure on JM. > Solution 1: > Add configuration and decide whether to enable this feature. The default is > false, which means the old plan is used. When the user is set to true, JM > will merge. > Solution 2: > The above configuration is not required, which is equivalent to being enabled > by default. However, JM detects the size of the state before merge, and does > not merge if it exceeds the threshold. The user can control the threshold > size. > Note: Most of the scenarios where Flink uses UnionListState are Kafka offset > (small state). In theory, most jobs are risk-free. -- This message was sent by Atlassian Jira (v8.3.4#803005)