GitHub user uce opened a pull request:
https://github.com/apache/flink/pull/1153
[FLINK-2354] Add job graph and checkpoint recovery
## tl;dr
This PR introduces `JobGraph` and `SuccessfulCheckpoint` recovery for
submitted programs in case of JobManager failures.
## General Idea
The general idea is to persist job graphs and successful checkpoints in
ZooKeeper.
We have introduced JobManager high availability via ZooKeeper in #1016. My
PR builds on top of this and adds initial support for program recovery. We can
recover both programs and successful checkpoints in case of a JobManager
failure as soon as a standby job manager is granted leadership.
ZooKeeper's sweet spot is rather small data (in KB range), but job graph
and checkpoint state can grow larger. Therefore we don't directly persist the
actual metadata, but use the state backend as a layer of indirection. We create
state handles for the job graph and completed checkpoints and persist those.
The state handle acts as a pointer to the actual data.
At the moment, only the file system state backend is supported for this.
The state handles need to be accessible from both task and job managers (e.g. a
DFS).
## Configuration
The minimal required configuration:
```bash
recovery.mode: ZOOKEEPER
ha.zookeeper.quorum: <ZooKeeper quroum peers>
state.backend: FILESYSTEM
state.backend.fs.dir.recovery: /path/to/recovery
```
I don't like the current configuration keys. Until the next release, I
would like a more consistent naming, e.g. prefix everything with
`recovery.zookeeper`.
## ZooKeeper Nodes Overview
Overview of ZNodes and components managing them:
```bash
O- /flink
|
+----O /flink/jobgraphs (SubmittedJobGraphs)
| |
| +----O /flink/jobgraphs/<job-id>
|
+----O /flink/checkpoints (CompletedCheckpoints)
| |
| +----O /flink/checkpoints/<job-id>
| . |
| . +----O /flink/checkpoints/<job-id>/1
| . |
| . +----O /flink/checkpoints/<job-id>/N
|
+----O /flink/checkpoint-counter (CheckpointIDCounter)
|
+----O /flink/checkpoints/<job-id>
```
## Implementation
### Submission vs. Recovery (JobManager and SubmittedJobGraphs)
- `ZooKeeperSubmittedJobGraphs` manages `SubmittedJobGraph` state handles
in ZooKeeper
- Submission and recovery follow mostly the same code paths (see
`JobManager#submitJob()`).
- On (initial) submission:
- After writing to ZooKeeper the JM checks synchronously whether she is
still leader.
- If not, the job is not scheduled for execution, but kept in ZooKeeper.
Future leading JobManagers need to recover it. The client currently sees this
as successful submission. The job is not removed in this case, because another
job manager might recover between the write and remove. In such a case, a job
would be running without being in ZooKeeper and without being acked to the
client.
- On recovery:
- Recovery is triggered on granted leadership via the configured
execution delay between retries.
- All available jobs are scheduled for execution.
- The ZNode for job graphs is monitored for modifications during
operations. This way, a job manager can (eventually) detect if another job
manager adds/removes a job and react to it.
### CompletedCheckpoints
- `ZooKeeperCompletedCheckpoints` manages `SuccessfulCheckpoint` state
handles in ZooKeeper (per job). Note that a `SuccessfulCheckpoint` has pointers
to further state handles in most cases. In this case, we add another layer of
indirection.
- Every completed checkpoint is added to ZooKeeper and identified by its
checkpoint ID.
- On recovery, the latest checkpoint is recovered. If more than one
checkpoint is available, we still only recover one in order to make sure that
the history of checkpoints is consistent (currently we retain only 1 checkpoint
anyways, but if we ever chose to retain more) in corner cases, where multiple
job managers run the same job with checkpointing for some time.
### CheckpointIDCounter
- `ZooKeeperCheckpointIDCounter` manages a shared counter in ZooKeeper (per
job).
- The `Checkpointed` interface requires ascending checkpoint IDs for each
checkpoint.
- We use a shared counter (per job) via a Curator recipe for this.
### Akka messages
- This PR introduces two new JobManager message types:
- RecoverAllJobs
- RecoverJob(JobID)
- The ZooKeeper operations are blocking and all JobManager actor calls
needs to make sure to *not* block the JobManager. I've tried to cover all
cases, where a ZooKeeper operation is triggered.
- For tests, I didn't manage to stop the JobManager actor w/o running the
`postStop` method. Because this method has some cleanup logic (removing job
graphs and checkpoints), all JobManager recovery tests run the JobManager as a
separate `JobManagerProcess`. This is quite heavy weight. If someone knows a
way to stop the actor w/o the `postStop` being called, it would be great to
refactor this.
## Next Steps
- Behaviour on recovery via fixed delay is too simplistic.
- Client is not fully integrated and submits jobs in detached mode if
recovery mode is set to ZooKeeper.
## Tests
There was a Travis/AWS outage yesterday and I couldn't run as many builds
as we should yet. I would like to run a couple of runs before we merge this.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/uce/flink recovery-2354
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/1153.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1153
----
commit aa0be0a27b7077fcdb303d99db40a4fb85acf82a
Author: Ufuk Celebi <[email protected]>
Date: 2015-09-03T13:13:28Z
[runtime] Add type parameter to ByteStreamStateHandle
commit f37041bd705e71a3d7b2897e498fbbe625b38217
Author: Ufuk Celebi <[email protected]>
Date: 2015-09-19T17:53:18Z
[clients] Submit job detached if recovery enabled
commit 83523771621eb8446a365e769f7b525d6430bcbb
Author: Ufuk Celebi <[email protected]>
Date: 2015-09-20T11:08:24Z
[FLINK-2652] [tests] Temporary ignore flakey
PartitionRequestClientFactoryTest
commit ad9b6572b73229ed92a6b3a0eee08d36a8e8bc6e
Author: Ufuk Celebi <[email protected]>
Date: 2015-09-01T15:25:46Z
[FLINK-2354] [runtime] Add job graph and checkpoint recovery
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---