[ 
https://issues.apache.org/jira/browse/FLINK-24852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17441681#comment-17441681
 ] 

Roman Khachatryan commented on FLINK-24852:
-------------------------------------------

Thanks Stephan and Yun,

I just would like to note that the checkpoint ID might not be known at the time 
of artifact creation because, for example, with pre-emptive writes in changelog 
backend. However, using "previous+1" should suffice because even if next 
checkpoint will be aborted, subsumption wouldn't start until *some* checkpoint 
completes - PCIIW.

 

Should we also support the case when a job crashes and is restarted externally 
(without relying on Flink HA) with a different JobID? The old job artifacts 
won't be removed by the new JM, or am I missing something?
[~mapohl] was it the case we discussed some time ago?

It looks like embedding Checkpoint IDs into file names is enough to infer which 
of them can be deleted, so this can be handled by external process (the code 
can be shared so it can be a JM component as well). WDYT?

> Cleanup of Orphaned Incremental State Artifacts
> -----------------------------------------------
>
>                 Key: FLINK-24852
>                 URL: https://issues.apache.org/jira/browse/FLINK-24852
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / State Backends
>    Affects Versions: 1.14.0
>            Reporter: Stephan Ewen
>            Priority: Major
>
> Shared State Artifacts (state files in the "shared" folder in the DFS / 
> ObjectStore) can become orphaned in various situations:
> * When a TaskManager fails right after it created a state file but before the 
> checkpoint was ack-ed to the JobManager, that state file will be orphaned.
> * When the JobManager fails all state newly added for the currently pending 
> checkpoint will be orphaned.
> These state artifacts are currently impossible to be cleaned up manually, 
> because it isn't easily possible to understand whether they are still being 
> used (referenced by any checkpoint).
> We should introduce a "garbage collector" that identifies and deletes such 
> orphaned state artifacts.
> h2. Idea for a cleanup mechanism
> A periodic cleanup thread would periodically execute a cleanup procedure that 
> searches for and deletes the orphaned artifacts.
> To identify those artifacts, the cleanup procedure needs the following inputs:
> * The oldest retained checkpoint ID
> * A snapshot of the shared state registry
> * A way to identify for each state artifact from which checkpoint it was 
> created.
> The cleanup procedure would
> * enumerate all state artifacts (for example files in the "shared" directory)
> * For each one check whether it was created earlier than the oldest retained 
> checkpoint. If not, that artifact would be skipped, because it might come 
> from a later pending checkpoint, or later canceled checkpoint.
> * Finally, the procedure checks if the state artifact is known by the shared 
> state registry. If yes, the artifact is kept, if not, it is orphaned and will 
> be deleted.
> Because the cleanup procedure is specific to the checkpoint storage, it 
> should probably be instantiated from the checkpoint storage.
> To make it possible to identify the checkpoint for which a state artifact was 
> created, we can put that checkpoint ID into the state file name, for example 
> format the state name as {{"<checkpointID>_<UUID>"}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to