[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14900296#comment-14900296
 ] 

ASF GitHub Bot commented on FLINK-2354:
---------------------------------------

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/1153

    [FLINK-2354] Add job graph and checkpoint recovery

    ## tl;dr
    
    This PR introduces `JobGraph` and `SuccessfulCheckpoint` recovery for 
submitted programs in case of JobManager failures.
    
    ## General Idea
    
    The general idea is to persist job graphs and successful checkpoints in 
ZooKeeper.
    
    We have introduced JobManager high availability via ZooKeeper in #1016. My 
PR builds on top of this and adds initial support for program recovery. We can 
recover both programs and successful checkpoints in case of a JobManager 
failure as soon as a standby job manager is granted leadership.
    
    ZooKeeper's sweet spot is rather small data (in KB range), but job graph 
and checkpoint state can grow larger. Therefore we don't directly persist the 
actual metadata, but use the state backend as a layer of indirection. We create 
state handles for the job graph and completed checkpoints and persist those. 
The state handle acts as a pointer to the actual data.
    
    At the moment, only the file system state backend is supported for this. 
The state handles need to be accessible from both task and job managers (e.g. a 
DFS).
    
    ## Configuration
    
    The minimal required configuration:
    
    ```bash
    recovery.mode: ZOOKEEPER
    ha.zookeeper.quorum: <ZooKeeper quroum peers>
    state.backend: FILESYSTEM
    state.backend.fs.dir.recovery: /path/to/recovery
    ```
    
    I don't like the current configuration keys. Until the next release, I 
would like a more consistent naming, e.g. prefix everything with 
`recovery.zookeeper`.
    
    ## ZooKeeper Nodes Overview
    
    Overview of ZNodes and components managing them:
    
    ```bash
    O- /flink
    |
    +----O /flink/jobgraphs (SubmittedJobGraphs)
    |    |
    |    +----O /flink/jobgraphs/<job-id>
    |
    +----O /flink/checkpoints  (CompletedCheckpoints)
    |    |
    |    +----O /flink/checkpoints/<job-id>
    |    .    |
    |    .    +----O /flink/checkpoints/<job-id>/1
    |    .    |
    |    .    +----O /flink/checkpoints/<job-id>/N
    |
    +----O /flink/checkpoint-counter (CheckpointIDCounter)
         |
         +----O /flink/checkpoints/<job-id>
    ```
    
    ## Implementation
    
    ### Submission vs. Recovery (JobManager and SubmittedJobGraphs)
    
    - `ZooKeeperSubmittedJobGraphs` manages `SubmittedJobGraph` state handles 
in ZooKeeper
    - Submission and recovery follow mostly the same code paths (see 
`JobManager#submitJob()`).
    - On (initial) submission:
      - After writing to ZooKeeper the JM checks synchronously whether she is 
still leader.
      - If not, the job is not scheduled for execution, but kept in ZooKeeper. 
Future leading JobManagers need to recover it. The client currently sees this 
as successful submission. The job is not removed in this case, because another 
job manager might recover between the write and remove. In such a case, a job 
would be running without being in ZooKeeper and without being acked to the 
client.
    - On recovery:
      - Recovery is triggered on granted leadership via the configured 
execution delay between retries.
      - All available jobs are scheduled for execution.
    - The ZNode for job graphs is monitored for modifications during 
operations. This way, a job manager can (eventually) detect if another job 
manager adds/removes a job and react to it.
    
    ### CompletedCheckpoints
    
    - `ZooKeeperCompletedCheckpoints` manages `SuccessfulCheckpoint` state 
handles in ZooKeeper (per job). Note that a `SuccessfulCheckpoint` has pointers 
to further state handles in most cases. In this case, we add another layer of 
indirection.
    - Every completed checkpoint is added to ZooKeeper and identified by its 
checkpoint ID.
    - On recovery, the latest checkpoint is recovered. If more than one 
checkpoint is available, we still only recover one in order to make sure that 
the history of checkpoints is consistent (currently we retain only 1 checkpoint 
anyways, but if we ever chose to retain more) in corner cases, where multiple 
job managers run the same job with checkpointing for some time.
    
    ### CheckpointIDCounter
    
    - `ZooKeeperCheckpointIDCounter` manages a shared counter in ZooKeeper (per 
job).
    - The `Checkpointed` interface requires ascending checkpoint IDs for each 
checkpoint.
    - We use a shared counter (per job) via a Curator recipe for this.
    
    ### Akka messages
    
    - This PR introduces two new JobManager message types:
      - RecoverAllJobs
      - RecoverJob(JobID)
    - The ZooKeeper operations are blocking and all JobManager actor calls 
needs to make sure to *not* block the JobManager. I've tried to cover all 
cases, where a ZooKeeper operation is triggered.
    - For tests, I didn't manage to stop the JobManager actor w/o running the 
`postStop` method. Because this method has some cleanup logic (removing job 
graphs and checkpoints), all JobManager recovery tests run the JobManager as a 
separate `JobManagerProcess`. This is quite heavy weight. If someone knows a 
way to stop the actor w/o the `postStop` being called, it would be great to 
refactor this.
    
    ## Next Steps
    
    - Behaviour on recovery via fixed delay is too simplistic.
    - Client is not fully integrated and submits jobs in detached mode if 
recovery mode is set to ZooKeeper.
    
    ## Tests
    
    There was a Travis/AWS outage yesterday and I couldn't run as many builds 
as we should yet. I would like to run a couple of runs before we merge this.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink recovery-2354

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1153.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1153
    
----
commit aa0be0a27b7077fcdb303d99db40a4fb85acf82a
Author: Ufuk Celebi <[email protected]>
Date:   2015-09-03T13:13:28Z

    [runtime] Add type parameter to ByteStreamStateHandle

commit f37041bd705e71a3d7b2897e498fbbe625b38217
Author: Ufuk Celebi <[email protected]>
Date:   2015-09-19T17:53:18Z

    [clients] Submit job detached if recovery enabled

commit 83523771621eb8446a365e769f7b525d6430bcbb
Author: Ufuk Celebi <[email protected]>
Date:   2015-09-20T11:08:24Z

    [FLINK-2652] [tests] Temporary ignore flakey 
PartitionRequestClientFactoryTest

commit ad9b6572b73229ed92a6b3a0eee08d36a8e8bc6e
Author: Ufuk Celebi <[email protected]>
Date:   2015-09-01T15:25:46Z

    [FLINK-2354] [runtime] Add job graph and checkpoint recovery

----


> Recover running jobs on JobManager failure
> ------------------------------------------
>
>                 Key: FLINK-2354
>                 URL: https://issues.apache.org/jira/browse/FLINK-2354
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager
>    Affects Versions: master
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>             Fix For: 0.10
>
>
> tl;dr Persist JobGraphs in state backend and coordinate reference to state 
> handle via ZooKeeper.
> Problem: When running multiple JobManagers in high availability mode, the 
> leading job manager looses all running jobs when it fails. After a new 
> leading job manager is elected, it is not possible to recover any previously 
> running jobs.
> Solution: The leading job manager, which receives the job graph writes 1) the 
> job graph to a state backend, and 2) a reference to the respective state 
> handle to ZooKeeper. In general, job graphs can become large (multiple MBs, 
> because they include closures etc.). ZooKeeper is not designed for data of 
> this size. The level of indirection via the reference to the state backend 
> keeps the data in ZooKeeper small.
> Proposed ZooKeeper layout:
> /flink (default)
>   +- currentJobs
>        +- job id i
>             +- state handle reference of job graph i
> The 'currentJobs' node needs to be persistent to allow recovery of jobs 
> between job managers. The currentJobs node needs to satisfy the following 
> invariant: There is a reference to a job graph with id i IFF the respective 
> job graph needs to be recovered by a newly elected job manager leader.
> With this in place, jobs will be recovered from their initial state (as if 
> resubmitted). The next step is to backup the runtime state handles of 
> checkpoints in a similar manner.
> ---
> This work will be based on [[email protected]]'s implementation of 
> FLINK-2291. The leader election service notifies the job manager about 
> granted/revoked leadership. This notification happens via Akka and thus 
> serially *per* job manager, but results in eventually consistent state 
> between job managers. For some snapshots of time it is possible to have a new 
> leader granted leadership, before the old one has been revoked its leadership.
> [[email protected]], can you confirm that leadership does not guarantee 
> mutually exclusive access to the shared 'currentJobs' state?
> For example, the following can happen:
> - JM 1 is leader, JM 2 is standby
> - JOB i is running (and hence /flink/currentJobs/i exists)
> - ZK notifies leader election service (LES) of JM 1 and JM 2
> - LES 2 immediately notifies JM 2 about granted leadership, but LES 1 
> notification revoking leadership takes longer
> - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives 
> final JobStatusChange
> - JM 2 resubmits the job /flink/currentJobs/i
> - JM 1 removes /flink/currentJobs/i, because it is now finished
> => inconsistent state (wrt the specified invariant above)
> If it is indeed a problem, we can circumvent this with a Curator recipe for 
> [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to 
> coordinate the access to currentJobs. The lock needs to be acquired on 
> leadership.
> ---
> Minimum required tests:
> - Unit tests for job graph serialization and writing to state backend and 
> ZooKeeper with expected nodes
> - Unit tests for job submission to job manager in leader/non-leader state
> - Unit tests for leadership granting/revoking and job submission/restarting 
> interleavings
> - Process failure integration tests with single and multiple running jobs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to