Stephan Ewen created FLINK-5820:
-----------------------------------
Summary: Extend State Backend Abstraction to support Global
Cleanup Hooks
Key: FLINK-5820
URL: https://issues.apache.org/jira/browse/FLINK-5820
Project: Flink
Issue Type: Improvement
Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Fix For: 1.3.0
The current state backend abstraction has the limitation that each piece of
state is only meaningful in the context of its state handle. There is no
possibility of a view onto "all state associated with checkpoint X".
That causes several issues
- State might not be cleaned up in the process of failures. When a
TaskManager hands over a state handle to the JobManager and either of them has
a failure, the state handle may be lost and state lingers.
- State might also linger if a cleanup operation failed temporarily, and the
checkpoint metadata was already disposed
- State cleanup is more expensive than necessary in many cases. Each state
handle is individually released. For large jobs, this means 1000s of release
operations (typically file deletes) per checkpoint, which can be expensive on
some file systems.
- It is hard to guarantee cleanup of parent directories with the current
architecture.
The core changes proposed here are:
1. Each job has one core {{StateBackend}}. In the future, operators may have
different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix and match
for example RocksDB storabe and in-memory storage.
2. The JobManager needs to be aware of the {{StateBackend}}.
3. Storing checkpoint metadata becomes responsibility of the state backend,
not the "completed checkpoint store". The later only stores the pointers to the
available latest checkpoints (either in process or in ZooKeeper).
4. The StateBackend may optionally have a hook to drop all checkpointed state
that belongs to only one specific checkpoint (shared state comes as part of
incremental checkpointing).
5. The StateBackend needs to have a hook to drop all checkpointed state up
to a specific checkpoint (for all previously discarded checkpoints).
6. In the future, this must support periodic cleanup hooks that track
orphaned shared state from incremental checkpoints.
For the {{FsStateBackend}}, which stores most of the checkpointes state
currently (transitively for RocksDB as well), this means a re-structuring of
the storage directories as follows:
{code}
../<flink-checkpoints>/job1-id/
/shared/ <-- shared checkpoint data
/chk-1/... <-- data exclusive to checkpoint 1
/chk-2/... <-- data exclusive to checkpoint 2
/chk-3/... <-- data exclusive to checkpoint 3
../<flink-checkpoints>/job2-id/
/shared/...
/chk-1/...
/chk-2/...
/chk-3/...
../<flink-savepoints>/savepoint-1/savepoint-root
/file-1-uid
/file-2-uid
/file-3-uid
/savepoint-2/savepoint-root
/file-1-uid
/file-2-uid
/file-3-uid
{code}
This is the umbrella issue for the individual steps needed to address this.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)