[ https://issues.apache.org/jira/browse/FLINK-8531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320035#comment-17320035 ]
fanrui commented on FLINK-8531: ------------------------------- [~sewen] For some high-parallel Flink tasks, when hdfs Namenode is under pressure, JM cleaning files becomes a bottleneck. I have some questions and hope to be answered. thanks. h1. Idea: When the CompletedCheckpoint#doDiscard method cleans up the State, there are three steps: 1. Clean up the _metadata file 2. Clean up all OperatorState 3. Clean up the chk-xxx directory, and recursive=false Question: The _metadata file and many OperatorState files are in the chk-xxx directory. Can the following changes be made: 1. Do not clean up the _metadata file 2. Do not clean up the files of OperatorState in the chk-xxx directory 3. Delete the chk-xxx directory, and recursive=true Are there any risks in making the above changes? h1. Motivation: Flink job: {code:java} Parallelism = 4000 Checkpoint interval = 10s {code} More than 5 Operators include State, and 4,000 * 5 = 20,000 files are generated every 10s. A large number of State files are written to the chk-xxx directory, and only JM cleans up these files. When the hdfs Namenode is under pressure, the speed of JM cleaning up files is slow, and the cleaning speed cannot keep up with the speed of generating files, resulting in a large number of files remaining in hdfs. Increasing the value of `state.backend.fs.memory-threshold` can alleviate the problem, but it cannot solve it at all. So I hope to clean up as much as possible at the directory level, not the file level. > Support separation of "Exclusive", "Shared" and "Task owned" state > ------------------------------------------------------------------ > > Key: FLINK-8531 > URL: https://issues.apache.org/jira/browse/FLINK-8531 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Priority: Major > Fix For: 1.5.0 > > > Currently, all state created at a certain checkpoint goes into the directory > {{chk-id}}. > With incremental checkpointing, some state is shared across checkpoint and is > referenced by newer checkpoints. That way, old {{chk-id}} directories stay > around, containing some shared chunks. That makes it both for users and > cleanup hooks hard to determine when a {{chk-x}} directory could be deleted. > The same holds for state that can only every be dropped by certain operators > on the TaskManager, never by the JobManager / CheckpointCoordinator. Examples > of that state are write ahead logs, which need to be retained until the move > to the target system is complete, which may in some cases be later then when > the checkpoint that created them is disposed. > I propose to introduce different scopes for tasks: > - **EXCLUSIVE** is for state that belongs to one checkpoint only > - **SHARED** is for state that is possibly part of multiple checkpoints > - **TASKOWNED** is for state that must never by dropped by the JobManager. > For file based checkpoint targets, I propose that we have the following > directory layout: > {code} > /user-defined-checkpoint-dir > | > + --shared/ > + --taskowned/ > + --chk-00001/ > + --chk-00002/ > + --chk-00003/ > ... > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)