[
https://issues.apache.org/jira/browse/FLINK-24852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442845#comment-17442845
]
Yun Tang commented on FLINK-24852:
----------------------------------
I think the suggestion to use the "previous+1" checkpoint id should be oaky
for changelog case to avoid unexpected deletion.
I think cleaning folders with different job id is very risky. We ever create
another standalone tool outside the Flink itself, which would analysis the
{{_metadata}} and delete files not in the {{_metadata}} and older than the
timestamp created by the last successful checkpoint. It might not be wise for a
community to offer such a tool as different DFS might not obey the rule that
still-in-use files having larger timestamp than last complete checkpoint due to
unexpected reason. Moreover, we could also face the case that a larger
checkpoint-id {{_metadata}} with older timestamp (maybe caused by the fixed job
id with different runs), which would make the cleaning logic very complex and
easy to delete files by mistake.
> Cleanup of Orphaned Incremental State Artifacts
> -----------------------------------------------
>
> Key: FLINK-24852
> URL: https://issues.apache.org/jira/browse/FLINK-24852
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / State Backends
> Affects Versions: 1.14.0
> Reporter: Stephan Ewen
> Priority: Major
>
> Shared State Artifacts (state files in the "shared" folder in the DFS /
> ObjectStore) can become orphaned in various situations:
> * When a TaskManager fails right after it created a state file but before the
> checkpoint was ack-ed to the JobManager, that state file will be orphaned.
> * When the JobManager fails all state newly added for the currently pending
> checkpoint will be orphaned.
> These state artifacts are currently impossible to be cleaned up manually,
> because it isn't easily possible to understand whether they are still being
> used (referenced by any checkpoint).
> We should introduce a "garbage collector" that identifies and deletes such
> orphaned state artifacts.
> h2. Idea for a cleanup mechanism
> A periodic cleanup thread would periodically execute a cleanup procedure that
> searches for and deletes the orphaned artifacts.
> To identify those artifacts, the cleanup procedure needs the following inputs:
> * The oldest retained checkpoint ID
> * A snapshot of the shared state registry
> * A way to identify for each state artifact from which checkpoint it was
> created.
> The cleanup procedure would
> * enumerate all state artifacts (for example files in the "shared" directory)
> * For each one check whether it was created earlier than the oldest retained
> checkpoint. If not, that artifact would be skipped, because it might come
> from a later pending checkpoint, or later canceled checkpoint.
> * Finally, the procedure checks if the state artifact is known by the shared
> state registry. If yes, the artifact is kept, if not, it is orphaned and will
> be deleted.
> Because the cleanup procedure is specific to the checkpoint storage, it
> should probably be instantiated from the checkpoint storage.
> To make it possible to identify the checkpoint for which a state artifact was
> created, we can put that checkpoint ID into the state file name, for example
> format the state name as {{"<checkpointID>_<UUID>"}}.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)