[
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15871885#comment-15871885
]
ASF GitHub Bot commented on FLINK-5820:
---------------------------------------
GitHub user uce opened a pull request:
https://github.com/apache/flink/pull/3346
Selfcontained
This is based on #3345 and only the last two commits are relevant here.
I've separated the test changes (last commit) and the main changes (2nd last
commit) for better reviewability. I would squash them before merging this. The
change is looks more involved than it actually is. It is mostly routing new
information with the checkpointing barriers, which touches a lot of places.
The main change is to add `CheckpointOptions` to the triggered checkpoint
messages (coordinator to barrier injecting tasks) and barriers (flowing inline
with the data):
```java
public class CheckpointOptions {
// Type of checkpoint
// => FULL_CHECKPOINT
// => SAVEPOINT
@NonNull
CheckpointType getCheckpointType();
// Custom target location. This is a String, because for future
// backends it can be a logical location like a DB table.
@Nullable
String getTargetLocation();
}
```
This class would be the place to define more options for performing the
checkpoints (for example for incremental checkpoints). @StephanEwen was
involved with the design of incremental checkpoints and could probably comment
best whether this is inline with the design for that.
These options are forwarded via the `StreamTask` to the `StreamOperator`s
and `Snapshotable` backends. The `AbstractStreamOperator` checks the options
and either
i) forwards the shared per operator `CheckpointStreamFactory` (as of
#3312), or
ii) creates a custom savepoint stream factory (one per savepoint).
For this, the state backends provide the following new method:
```java
CheckpointStreamFactory createSavepointStreamFactory(JobID, String, String);
```
The `MemoryStateBackend` returns the regular stream factory and the
`FsStateBackend` returns a `FsSavepointStreamFactory`, which writes all
checkpoint streams to a single directory (instead of the regular sub folders
per checkpoint).
We end up with the following directory layout for savepoints:
```
+---------------------------+
| :root_savepoint_directory | (custom per savepoint or configured default
via `state.savepoints.dir`)
+---------------------------+
| +---------------------------------------+
+-| savepoint-:jobId(0, 6)-:random_suffix | (one directory per savepoint)
+---------------------------------------+
|
+- _metadata (one per savepoint)
+- :uuid (one data file per StreamTask)
+- ...
+- :uuid
```
I decided to include a prefix of the job ID to the savepoint directory,
because I think that this could be helpful to map savepoints to jobs, which is
a manual task.
It's important to make sure that this is inline with upcoming changes for
incremental checkpoints (discussion on mailing list) and FLINK-5820.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/uce/flink selfcontained
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3346.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3346
----
commit 64e1f8892597dd26489774fe494fd7b211d789a9
Author: Ufuk Celebi <[email protected]>
Date: 2017-02-15T16:52:40Z
[FLINK-5763] [checkpoints] Acknowledge with explicit ID and
CheckpointMetrics
Instead of acknowledging checkpoints with the CheckpointMetaData make
the acknowledgement explicit by ID and CheckpointMetrics. The rest is
not needed.
commit 0b8fa02a150ae6d5891e04d1fce6de748a283aaf
Author: Ufuk Celebi <[email protected]>
Date: 2017-02-15T17:16:44Z
[FLINK-5763] [checkpoints] Move CheckpointMetrics out of CheckpointMetaData
commit d023159d0fef1da24373d7880e11d0a36afbd7ef
Author: Ufuk Celebi <[email protected]>
Date: 2017-02-16T15:52:32Z
[FLINK-5763] [checkpoints] Add isSavepoint() to CheckpointProperties
commit 2496068371bf1aac9e2ce6223002c1d9a043930f
Author: Ufuk Celebi <[email protected]>
Date: 2017-02-16T16:56:23Z
[FLINK-5763] [checkpoints] Add CheckpointOptions
Adds `CheckpointOptions` to the triggered checkpoint messages (coordinator
to barrier injecting tasks) and barriers (flowing inline with the data:
```java
public class CheckpointOptions {
// Type of checkpoint
// => FULL_CHECKPOINT
// => SAVEPOINT
@NonNull
CheckpointType getCheckpointType();
// Custom target location. This is a String, because for future
// backends it can be a logical location like a DB table.
@Nullable
String getTargetLocation();
}
```
This class would be the place to define more options for performing the
checkpoints (for example for incremental checkpoints).
These options are forwarded via the `StreamTask` to the `StreamOperator`s
and
`Snapshotable` backends. The `AbstractStreamOperator` checks the options and
either i) forwards the shared per operator `CheckpointStreamFactory` (as of
For this, the state backends provide the following new method:
```
CheckpointStreamFactory createSavepointStreamFactory(JobID, String, String);
```
The `MemoryStateBackend` returns the regular stream factory and the
`FsStateBackend` returns a `FsSavepointStreamFactory`, which writes all
checkpoint streams to a single directory (instead of the regular sub folders
per checkpoint).
We end up with the following directory layout for savepoints:
```
+---------------------------+
| :root_savepoint_directory | (custom per savepoint or configured default
via `state.savepoints.dir`)
+---------------------------+
| +---------------------------------------+
+-| savepoint-:jobId(0, 6)-:random_suffix | (one directory per savepoint)
+---------------------------------------+
|
+- _metadata (one per savepoint)
+- :uuid (one data file per StreamTask)
+- ...
+- :uuid
```
commit ca80d68ab7faba663f4c384156a90d58de0ebf82
Author: Ufuk Celebi <[email protected]>
Date: 2017-02-16T16:56:37Z
[FLINK-5763] [checkpoints] Adjust tests
----
> Extend State Backend Abstraction to support Global Cleanup Hooks
> ----------------------------------------------------------------
>
> Key: FLINK-5820
> URL: https://issues.apache.org/jira/browse/FLINK-5820
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.2.0
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The current state backend abstraction has the limitation that each piece of
> state is only meaningful in the context of its state handle. There is no
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
> - State might not be cleaned up in the process of failures. When a
> TaskManager hands over a state handle to the JobManager and either of them
> has a failure, the state handle may be lost and state lingers.
> - State might also linger if a cleanup operation failed temporarily, and
> the checkpoint metadata was already disposed
> - State cleanup is more expensive than necessary in many cases. Each state
> handle is individually released. For large jobs, this means 1000s of release
> operations (typically file deletes) per checkpoint, which can be expensive on
> some file systems.
> - It is hard to guarantee cleanup of parent directories with the current
> architecture.
> The core changes proposed here are:
> 1. Each job has one core {{StateBackend}}. In the future, operators may
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix
> and match for example RocksDB storabe and in-memory storage.
> 2. The JobManager needs to be aware of the {{StateBackend}}.
> 3. Storing checkpoint metadata becomes responsibility of the state backend,
> not the "completed checkpoint store". The later only stores the pointers to
> the available latest checkpoints (either in process or in ZooKeeper).
> 4. The StateBackend may optionally have a hook to drop all checkpointed
> state that belongs to only one specific checkpoint (shared state comes as
> part of incremental checkpointing).
> 5. The StateBackend needs to have a hook to drop all checkpointed state up
> to a specific checkpoint (for all previously discarded checkpoints).
> 6. In the future, this must support periodic cleanup hooks that track
> orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state
> currently (transitively for RocksDB as well), this means a re-structuring of
> the storage directories as follows:
> {code}
> ../<flink-checkpoints>/job1-id/
> /shared/ <-- shared checkpoint data
> /chk-1/... <-- data exclusive to checkpoint 1
> /chk-2/... <-- data exclusive to checkpoint 2
> /chk-3/... <-- data exclusive to checkpoint 3
> ../<flink-checkpoints>/job2-id/
> /shared/...
> /chk-1/...
> /chk-2/...
> /chk-3/...
> ../<flink-savepoints>/savepoint-1/savepoint-root
> /file-1-uid
> /file-2-uid
> /file-3-uid
> /savepoint-2/savepoint-root
> /file-1-uid
> /file-2-uid
> /file-3-uid
> {code}
> This is the umbrella issue for the individual steps needed to address this.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)