[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14943173#comment-14943173
 ] 

ASF GitHub Bot commented on FLINK-2354:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1153#discussion_r41128614
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
---
    @@ -153,6 +176,143 @@ public static ZooKeeperLeaderElectionService 
createLeaderElectionService(
        }
     
        /**
    +    * Creates a {@link ZooKeeperSubmittedJobGraphs} instance.
    +    *
    +    * @param client        The {@link CuratorFramework} ZooKeeper client 
to use
    +    * @param configuration {@link Configuration} object
    +    * @return {@link ZooKeeperSubmittedJobGraphs} instance
    +    */
    +   public static ZooKeeperSubmittedJobGraphs createSubmittedJobGraphs(
    +                   CuratorFramework client,
    +                   Configuration configuration) throws Exception {
    +
    +           checkNotNull(configuration, "Configuration");
    +
    +           // State backend
    +           String stateBackend = configuration.getString(
    +                           ConfigConstants.STATE_BACKEND,
    +                           ConfigConstants.DEFAULT_STATE_BACKEND);
    +
    +           if (!stateBackend.toLowerCase().equals("filesystem")) {
    --- End diff --
    
    Could we use the enum `StateBackend` here rather than string equality? 
Maybe it would be a good idea to use a factory method for the state backend 
depending on the `StateBackend` enum value.


> Recover running jobs on JobManager failure
> ------------------------------------------
>
>                 Key: FLINK-2354
>                 URL: https://issues.apache.org/jira/browse/FLINK-2354
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager
>    Affects Versions: master
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>             Fix For: 0.10
>
>
> tl;dr Persist JobGraphs in state backend and coordinate reference to state 
> handle via ZooKeeper.
> Problem: When running multiple JobManagers in high availability mode, the 
> leading job manager looses all running jobs when it fails. After a new 
> leading job manager is elected, it is not possible to recover any previously 
> running jobs.
> Solution: The leading job manager, which receives the job graph writes 1) the 
> job graph to a state backend, and 2) a reference to the respective state 
> handle to ZooKeeper. In general, job graphs can become large (multiple MBs, 
> because they include closures etc.). ZooKeeper is not designed for data of 
> this size. The level of indirection via the reference to the state backend 
> keeps the data in ZooKeeper small.
> Proposed ZooKeeper layout:
> /flink (default)
>   +- currentJobs
>        +- job id i
>             +- state handle reference of job graph i
> The 'currentJobs' node needs to be persistent to allow recovery of jobs 
> between job managers. The currentJobs node needs to satisfy the following 
> invariant: There is a reference to a job graph with id i IFF the respective 
> job graph needs to be recovered by a newly elected job manager leader.
> With this in place, jobs will be recovered from their initial state (as if 
> resubmitted). The next step is to backup the runtime state handles of 
> checkpoints in a similar manner.
> ---
> This work will be based on [~trohrm...@apache.org]'s implementation of 
> FLINK-2291. The leader election service notifies the job manager about 
> granted/revoked leadership. This notification happens via Akka and thus 
> serially *per* job manager, but results in eventually consistent state 
> between job managers. For some snapshots of time it is possible to have a new 
> leader granted leadership, before the old one has been revoked its leadership.
> [~trohrm...@apache.org], can you confirm that leadership does not guarantee 
> mutually exclusive access to the shared 'currentJobs' state?
> For example, the following can happen:
> - JM 1 is leader, JM 2 is standby
> - JOB i is running (and hence /flink/currentJobs/i exists)
> - ZK notifies leader election service (LES) of JM 1 and JM 2
> - LES 2 immediately notifies JM 2 about granted leadership, but LES 1 
> notification revoking leadership takes longer
> - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives 
> final JobStatusChange
> - JM 2 resubmits the job /flink/currentJobs/i
> - JM 1 removes /flink/currentJobs/i, because it is now finished
> => inconsistent state (wrt the specified invariant above)
> If it is indeed a problem, we can circumvent this with a Curator recipe for 
> [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to 
> coordinate the access to currentJobs. The lock needs to be acquired on 
> leadership.
> ---
> Minimum required tests:
> - Unit tests for job graph serialization and writing to state backend and 
> ZooKeeper with expected nodes
> - Unit tests for job submission to job manager in leader/non-leader state
> - Unit tests for leadership granting/revoking and job submission/restarting 
> interleavings
> - Process failure integration tests with single and multiple running jobs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to