Stephan Ewen created FLINK-24852:
------------------------------------

             Summary: Cleanup of Orphaned Incremental State Artifacts
                 Key: FLINK-24852
                 URL: https://issues.apache.org/jira/browse/FLINK-24852
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / State Backends
    Affects Versions: 1.14.0
            Reporter: Stephan Ewen


Shared State Artifacts (state files in the "shared" folder in the DFS / 
ObjectStore) can become orphaned in various situations:

* When a TaskManager fails right after it created a state file but before the 
checkpoint was ack-ed to the JobManager, that state file will be orphaned.
* When the JobManager fails all state newly added for the currently pending 
checkpoint will be orphaned.

These state artifacts are currently impossible to be cleaned up manually, 
because it isn't easily possible to understand whether they are still being 
used (referenced by any checkpoint).

We should introduce a "garbage collector" that identifies and deletes such 
orphaned state artifacts.

h2. Idea for a cleanup mechanism

A periodic cleanup thread would periodically execute a cleanup procedure that 
searches for and deletes the orphaned artifacts.
To identify those artifacts, the cleanup procedure needs the following inputs:

* The oldest retained checkpoint ID
* A snapshot of the shared state registry
* A way to identify for each state artifact from which checkpoint it was 
created.

The cleanup procedure would
* enumerate all state artifacts (for example files in the "shared" directory)
* For each one check whether it was created earlier than the oldest retained 
checkpoint. If not, that artifact would be skipped, because it might come from 
a later pending checkpoint, or later canceled checkpoint.
* Finally, the procedure checks if the state artifact is known by the shared 
state registry. If yes, the artifact is kept, if not, it is orphaned and will 
be deleted.

Because the cleanup procedure is specific to the checkpoint storage, it should 
probably be instantiated from the checkpoint storage.

To make it possible to identify the checkpoint for which a state artifact was 
created, we can put that checkpoint ID into the state file name, for example 
format the state name as {{"<checkpointID>_<UUID>"}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to