[
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712017#comment-16712017
]
Stephan Ewen edited comment on FLINK-10930 at 12/6/18 8:53 PM:
---------------------------------------------------------------
[~yunta] Better exception handling is definitely always a really good idea. I
would suggest to proceed as follows:
- We work towards a model where checkpoint failures never cause task failures,
but only "decline messages". As fas as I know, [~azagrebin] mentioned that
there is some discussion already. Could you chime in here, Andrey?
- We fix all cases of state being left in exception cases that we find.
- We introduce an exclusive state cleanup daemon (see below)
- We introduce a shared state cleanup daemon (see below)
h3. Cleaning up exclusive state (idea)
- when the daemon starts a cleanup sweep it determines what the oldest still
retained checkpoint number is
- it enumerates all directories "CHK-XXX" and recursively deletes the ones
older than the oldest retained checkpoint
h3. Cleaning up shared state (idea)
- every shared state file needs to encode the checkpoint-id of the checkpoint
during which it was created
- when the daemon does a cleanup swipe, it atomically grabs the number of the
latest completed checkpoint, and a snapshot of all shared state handles in the
shared state registry
- the daemon then lists all files in the shared state directory and deletes
all files that are neither referenced by a shared state handle and are older
than the latest completed checkpoint
What do you think about that?
was (Author: stephanewen):
[~yunta] Better exception handling is definitely always a really good idea. I
would suggest to proceed as follows:
- We work towards a model where checkpoint failures never cause task
failures, but only "decline messages". As fas as I know, [~azagrebin] mentioned
that there is some discussion already. Could you chime in here, Andrey?
- We fix all cases of state being left in exception cases that we find.
- We introduce an exclusive state cleanup daemon (see below)
- We introduce a shared state cleanup daemon (see below)
h3. Cleaning up exclusive state (idea)
- when the daemon starts a cleanup sweep it determines what the oldest still
retained checkpoint number is
- it enumerates all directories "CHK-XXX" and recursively deletes the ones
older than the oldest retained checkpoint
h3. Cleaning up shared state (idea)
- every shared state file needs to encode the checkpoint-id of the checkpoint
during which it was created
- when the daemon does a cleanup swipe, it atomically grabs the number of the
latest completed checkpoint, and a snapshot of all shared state handles in the
shared state registry
- the daemon then lists all files in the shared state directory and deletes
all files that are neither referenced by a shared state handle and are older
than the latest completed checkpoint
What do you think about that?
> Refactor checkpoint directory layout
> ------------------------------------
>
> Key: FLINK-10930
> URL: https://issues.apache.org/jira/browse/FLINK-10930
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.8.0
> Reporter: Yun Tang
> Assignee: Yun Tang
> Priority: Major
> Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with
> three different scopes for tasks:
> * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data
> and operator state files.
> * *SHARED* is for state that is possibly part of multiple checkpoints
> * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
> |
> +-- shared/
> +-- taskowned/
> +-- chk-1/ // metadata and operator-state files
> +-- chk-2/
> ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory,
> which is the {{chk-id}} checkpoint directory, should only be one left.
> However, as FLINK-10855 interpreted, the failed/expired checkpoint
> directories would also be left. This is really confusing for users who [uses
> externalized checkpoint to resume
> job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint],
> not to mention the checkpoint directory resource leak.
> As far as I could know, if the {{chk-id}} checkpoint directory still
> contains the operator state files, I have no idea how to clean the useless
> {{chk-id}} checkpoint directory gracefully. Once job manager dispose the
> failed/expired checkpoint, the target {{chk-id}} checkpoint directory would
> be deleted by JM. However, this directory would also be create by tasks who
> having not reported to JM. When {{checkpoint coordinator}} received those
> late expired tasks, it would discard those useless handles. However, if JM
> also plans to delete the empty parent folder, which is already unsupported
> after FLINK-8540, another task uploading operator state files would meet
> exception due to its writing target's parent directory has just been removed.
> Currently, we handle task checkpoint failure as task failure and the whole
> job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of
> exclusive directories, one is still several {{chk-id}} checkpoint directories
> but only contains its exclusive {{meta data}}, the other is just one
> directory named {{exclusive}} which containing the operator state files.
> Operator state files are exclusive to just one specified checkpoint, we could
> also add {{checkpoint-id}} within their file name to let users easily clean
> up.
> The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
> |
> +-- shared/
> +-- taskowned/
> +-- exclusive/ // operator state files
> +-- chk-1/ // metadata
> +-- chk-2/
> ...{code}
>
> This new directory layout would not affect users who use external checkpoint
> to resume jobs, since they still just give
> {{/user-defined-dir/job-id/chk-id}} path to resume job.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)