[ 
https://issues.apache.org/jira/browse/FLINK-8531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320035#comment-17320035
 ] 

fanrui commented on FLINK-8531:
-------------------------------

[~sewen]

For some high-parallel Flink tasks, when hdfs Namenode is under pressure, JM 
cleaning files becomes a bottleneck. I have some questions and hope to be 
answered. thanks.
h1. Idea:

When the CompletedCheckpoint#doDiscard method cleans up the State, there are 
three steps:
1. Clean up the _metadata file
2. Clean up all OperatorState
3. Clean up the chk-xxx directory, and recursive=false

Question: The _metadata file and many OperatorState files are in the chk-xxx 
directory. Can the following changes be made:
1. Do not clean up the _metadata file
2. Do not clean up the files of OperatorState in the chk-xxx directory
3. Delete the chk-xxx directory, and recursive=true

Are there any risks in making the above changes?
h1. Motivation:

Flink job:
{code:java}
Parallelism = 4000
Checkpoint interval = 10s
{code}
More than 5 Operators include State, and 4,000 * 5 = 20,000 files are generated 
every 10s.

A large number of State files are written to the chk-xxx directory, and only JM 
cleans up these files. When the hdfs Namenode is under pressure, the speed of 
JM cleaning up files is slow, and the cleaning speed cannot keep up with the 
speed of generating files, resulting in a large number of files remaining in 
hdfs.

Increasing the value of `state.backend.fs.memory-threshold` can alleviate the 
problem, but it cannot solve it at all. So I hope to clean up as much as 
possible at the directory level, not the file level.

> Support separation of "Exclusive", "Shared" and "Task owned" state
> ------------------------------------------------------------------
>
>                 Key: FLINK-8531
>                 URL: https://issues.apache.org/jira/browse/FLINK-8531
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / State Backends
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Major
>             Fix For: 1.5.0
>
>
> Currently, all state created at a certain checkpoint goes into the directory 
> {{chk-id}}.
> With incremental checkpointing, some state is shared across checkpoint and is 
> referenced by newer checkpoints. That way, old {{chk-id}} directories stay 
> around, containing some shared chunks. That makes it both for users and 
> cleanup hooks hard to determine when a {{chk-x}} directory could be deleted.
> The same holds for state that can only every be dropped by certain operators 
> on the TaskManager, never by the JobManager / CheckpointCoordinator. Examples 
> of that state are write ahead logs, which need to be retained until the move 
> to the target system is complete, which may in some cases be later then when 
> the checkpoint that created them is disposed.
> I propose to introduce different scopes for tasks:
>   - **EXCLUSIVE** is for state that belongs to one checkpoint only
>   - **SHARED** is for state that is possibly part of multiple checkpoints
>   - **TASKOWNED** is for state that must never by dropped by the JobManager.
> For file based checkpoint targets, I propose that we have the following 
> directory layout:
> {code}
> /user-defined-checkpoint-dir
>     |
>     + --shared/
>     + --taskowned/
>     + --chk-00001/
>     + --chk-00002/
>     + --chk-00003/
>     ...
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to