[
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Robert Metzger updated FLINK-5820:
----------------------------------
Fix Version/s: (was: 1.3.0)
1.4.0
> Extend State Backend Abstraction to support Global Cleanup Hooks
> ----------------------------------------------------------------
>
> Key: FLINK-5820
> URL: https://issues.apache.org/jira/browse/FLINK-5820
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.2.0
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> The current state backend abstraction has the limitation that each piece of
> state is only meaningful in the context of its state handle. There is no
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
> - State might not be cleaned up in the process of failures. When a
> TaskManager hands over a state handle to the JobManager and either of them
> has a failure, the state handle may be lost and state lingers.
> - State might also linger if a cleanup operation failed temporarily, and
> the checkpoint metadata was already disposed
> - State cleanup is more expensive than necessary in many cases. Each state
> handle is individually released. For large jobs, this means 1000s of release
> operations (typically file deletes) per checkpoint, which can be expensive on
> some file systems.
> - It is hard to guarantee cleanup of parent directories with the current
> architecture.
> The core changes proposed here are:
> 1. Each job has one core {{StateBackend}}. In the future, operators may
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix
> and match for example RocksDB storabe and in-memory storage.
> 2. The JobManager needs to be aware of the {{StateBackend}}.
> 3. Storing checkpoint metadata becomes responsibility of the state backend,
> not the "completed checkpoint store". The later only stores the pointers to
> the available latest checkpoints (either in process or in ZooKeeper).
> 4. The StateBackend may optionally have a hook to drop all checkpointed
> state that belongs to only one specific checkpoint (shared state comes as
> part of incremental checkpointing).
> 5. The StateBackend needs to have a hook to drop all checkpointed state up
> to a specific checkpoint (for all previously discarded checkpoints).
> 6. In the future, this must support periodic cleanup hooks that track
> orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state
> currently (transitively for RocksDB as well), this means a re-structuring of
> the storage directories as follows:
> {code}
> ../<flink-checkpoints>/job1-id/
> /shared/ <-- shared checkpoint data
> /chk-1/... <-- data exclusive to checkpoint 1
> /chk-2/... <-- data exclusive to checkpoint 2
> /chk-3/... <-- data exclusive to checkpoint 3
> ../<flink-checkpoints>/job2-id/
> /shared/...
> /chk-1/...
> /chk-2/...
> /chk-3/...
> ../<flink-savepoints>/savepoint-1/savepoint-root
> /file-1-uid
> /file-2-uid
> /file-3-uid
> /savepoint-2/savepoint-root
> /file-1-uid
> /file-2-uid
> /file-3-uid
> {code}
> This is the umbrella issue for the individual steps needed to address this.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)