[ https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713903#comment-16713903 ]
Yun Tang commented on FLINK-10930: ---------------------------------- [~StephanEwen] The ideas of cleaning up exclusive and shared state look very very similar with our solution, in which we already encoded checkpoint-id on state files. * For cleaning up the exclusive state, to delete more eagerly, we remove the ones, which older than the *latest* completed checkpoint and also not contained in current retained checkpoints. * For cleaning up shared state, we met more complex situation due to we introduce a mechanism which writes the state into files with the same name across different checkpoints (to reduce the number of files created when each checkpoint created and the files each old checkpoint subsumed), the encoded checkpoint-id means the first checkpoint id when this file created. Although a bit more complex, the rule to clean up shared state just behaves exactly the same as yours: deletes all files that are neither referenced by a shared state handle and are older than the latest completed checkpoint. To cleanup more eagerly, we also have option to let the procedure, list and find files could be deleted, happens before any tasks running. Last but not least, "lists all files in the shared state directory" needs to introduce new {{listStatusIterator}} API, current {{listStatus}} API might cause JM to crash and expensive for DFS. > Refactor checkpoint directory layout > ------------------------------------ > > Key: FLINK-10930 > URL: https://issues.apache.org/jira/browse/FLINK-10930 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.8.0 > Reporter: Yun Tang > Assignee: Yun Tang > Priority: Major > Fix For: 1.8.0 > > > The current checkpoint directory layout is introduced from FLINK-8531 with > three different scopes for tasks: > * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data > and operator state files. > * *SHARED* is for state that is possibly part of multiple checkpoints > * *TASKOWNED* is for state that must never by dropped by the jobManager. > {code:java} > /user-defined-dir/{job-id} > | > +-- shared/ > +-- taskowned/ > +-- chk-1/ // metadata and operator-state files > +-- chk-2/ > ...{code} > If we just retain one complete checkpoint, the expected exclusive directory, > which is the {{chk-id}} checkpoint directory, should only be one left. > However, as FLINK-10855 interpreted, the failed/expired checkpoint > directories would also be left. This is really confusing for users who [uses > externalized checkpoint to resume > job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint], > not to mention the checkpoint directory resource leak. > As far as I could know, if the {{chk-id}} checkpoint directory still > contains the operator state files, I have no idea how to clean the useless > {{chk-id}} checkpoint directory gracefully. Once job manager dispose the > failed/expired checkpoint, the target {{chk-id}} checkpoint directory would > be deleted by JM. However, this directory would also be create by tasks who > having not reported to JM. When {{checkpoint coordinator}} received those > late expired tasks, it would discard those useless handles. However, if JM > also plans to delete the empty parent folder, which is already unsupported > after FLINK-8540, another task uploading operator state files would meet > exception due to its writing target's parent directory has just been removed. > Currently, we handle task checkpoint failure as task failure and the whole > job would failover which is not we want. > From what I see, I plan to separate *EXCLUSIVE* directory into two kind of > exclusive directories, one is still several {{chk-id}} checkpoint directories > but only contains its exclusive {{meta data}}, the other is just one > directory named {{exclusive}} which containing the operator state files. > Operator state files are exclusive to just one specified checkpoint, we could > also add {{checkpoint-id}} within their file name to let users easily clean > up. > The refactored directory layout should be : > {code:java} > /user-defined-dir/{job-id} > | > +-- shared/ > +-- taskowned/ > +-- exclusive/ // operator state files > +-- chk-1/ // metadata > +-- chk-2/ > ...{code} > > This new directory layout would not affect users who use external checkpoint > to resume jobs, since they still just give > {{/user-defined-dir/job-id/chk-id}} path to resume job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)