[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14943152#comment-14943152
 ] 

ASF GitHub Bot commented on FLINK-2354:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1153#discussion_r41126979
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java
 ---
    @@ -0,0 +1,356 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmanager;
    +
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    +import 
org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    +import org.apache.curator.utils.ZKPaths;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.state.StateHandleProvider;
    +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle;
    +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.ConcurrentModificationException;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static com.google.common.base.Preconditions.checkState;
    +
    +/**
    + * {@link SubmittedJobGraph} instances for JobManagers running in {@link 
RecoveryMode#ZOOKEEPER}.
    + *
    + * <p>Each job graph creates ZNode:
    + * <pre>
    + * +----O /flink/jobgraphs/&lt;job-id&gt; 1 [persistent]
    + * .
    + * .
    + * .
    + * +----O /flink/jobgraphs/&lt;job-id&gt; N [persistent]
    + * </pre>
    + *
    + * <p>The root path is watched to detect concurrent modifications in 
corner situations where
    + * multiple instances operate concurrently. The job manager acts as a 
{@link SubmittedJobGraphListener}
    + * to react to such situations.
    + */
    +public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class);
    +
    +   /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
    +   private final Object cacheLock = new Object();
    +
    +   /** Client (not a namespace facade) */
    +   private final CuratorFramework client;
    +
    +   /** The set of IDs of all added job graphs. */
    +   private final Set<JobID> addedJobGraphs = new HashSet<>();
    +
    +   /** Completed checkpoints in ZooKeeper */
    +   private final ZooKeeperStateHandleStore<SubmittedJobGraph> 
jobGraphsInZooKeeper;
    +
    +   /**
    +    * Cache to monitor all children. This is used to detect races with 
other instances working
    +    * on the same state.
    +    */
    +   private final PathChildrenCache pathCache;
    +
    +   /** The external listener to be notified on races. */
    +   private SubmittedJobGraphListener jobGraphListener;
    +
    +   /** Flag indicating whether this instance is running. */
    +   private boolean isRunning;
    +
    +   public ZooKeeperSubmittedJobGraphs(
    +                   CuratorFramework client,
    +                   String currentJobsPath,
    +                   StateHandleProvider<SubmittedJobGraph> 
stateHandleProvider) throws Exception {
    +
    +           checkNotNull(client, "Curator client");
    +           checkNotNull(currentJobsPath, "Current jobs path");
    +           checkNotNull(stateHandleProvider, "State handle provider");
    +
    +           // Keep a reference to the original client and not the 
namespace facade. The namespace
    +           // facade cannot be closed.
    +           this.client = client;
    +
    +           // Ensure that the job graphs path exists
    +           client.newNamespaceAwareEnsurePath(currentJobsPath)
    +                           .ensure(client.getZookeeperClient());
    +
    +           // All operations will have the path as root
    +           client = client.usingNamespace(client.getNamespace() + 
currentJobsPath);
    +
    +           this.jobGraphsInZooKeeper = new 
ZooKeeperStateHandleStore<>(client, stateHandleProvider);
    +
    +           this.pathCache = new PathChildrenCache(client, "/", false);
    +           pathCache.getListenable().addListener(new 
SubmittedJobGraphsPathCacheListener());
    +   }
    +
    +   @Override
    +   public void start(SubmittedJobGraphListener jobGraphListener) throws 
Exception {
    +           synchronized (cacheLock) {
    +                   if (!isRunning) {
    +                           this.jobGraphListener = jobGraphListener;
    +
    +                           pathCache.start();
    +
    +                           isRunning = true;
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   public void stop() throws Exception {
    +           synchronized (cacheLock) {
    +                   if (isRunning) {
    +                           jobGraphListener = null;
    +
    +                           pathCache.close();
    +
    +                           client.close();
    +
    +                           isRunning = false;
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
    +           synchronized (cacheLock) {
    +                   verifyIsRunning();
    +
    +                   List<ZooKeeperStateHandle<SubmittedJobGraph>> submitted;
    +
    +                   while (true) {
    +                           try {
    +                                   submitted = 
jobGraphsInZooKeeper.getAll();
    +                                   break;
    +                           }
    +                           catch (ConcurrentModificationException e) {
    +                                   LOG.warn("Concurrent modification while 
reading from ZooKeeper. Retrying.");
    +                           }
    +                   }
    +
    +                   if (submitted.size() != 0) {
    +                           List<SubmittedJobGraph> jobGraphs = new 
ArrayList<>(submitted.size());
    +
    +                           for (ZooKeeperStateHandle<SubmittedJobGraph> 
jobStateHandle : submitted) {
    +                                   SubmittedJobGraph jobGraph = 
jobStateHandle
    +                                                   .getStateHandle()
    +                                                   
.getState(ClassLoader.getSystemClassLoader());
    +
    +                                   addedJobGraphs.add(jobGraph.getJobId());
    +
    +                                   jobGraphs.add(jobGraph);
    +                           }
    +
    +                           LOG.info("Recovered {} job graphs: {}.", 
jobGraphs.size(), jobGraphs);
    +                           return jobGraphs;
    +                   }
    +                   else {
    +                           LOG.info("No job graph to recover.");
    +                           return Collections.emptyList();
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
    +           checkNotNull(jobId, "Job ID");
    +           String path = getPathForJob(jobId);
    +
    +           synchronized (cacheLock) {
    +                   verifyIsRunning();
    +
    +                   StateHandle<SubmittedJobGraph> jobStateHandle = 
jobGraphsInZooKeeper.get(path);
    +
    +                   SubmittedJobGraph jobGraph = jobStateHandle
    +                                   
.getState(ClassLoader.getSystemClassLoader());
    +
    +                   addedJobGraphs.add(jobGraph.getJobId());
    +
    +                   LOG.info("Recovered {}.", jobGraph);
    +                   return jobGraph;
    +           }
    +   }
    +
    +   @Override
    +   public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
    +           checkNotNull(jobGraph, "Job graph");
    +           String path = getPathForJob(jobGraph.getJobId());
    +
    +           synchronized (cacheLock) {
    +                   verifyIsRunning();
    +
    +                   int currentVersion = jobGraphsInZooKeeper.exists(path);
    +
    +                   if (currentVersion == -1) {
    +                           jobGraphsInZooKeeper.add(path, jobGraph);
    +                           LOG.info("Added {} to ZooKeeper.", jobGraph);
    +                   }
    +                   else if (addedJobGraphs.contains(jobGraph.getJobId())) {
    +                           jobGraphsInZooKeeper.replace(path, 
currentVersion, jobGraph);
    +                           LOG.info("Updated {} in ZooKeeper.", jobGraph);
    +                   }
    +                   else {
    +                           throw new IllegalStateException("Oh, no. Trying 
to update a graph you didn't " +
    +                                           "#getAllSubmittedJobGraphs() or 
#putJobGraph() yourself before.");
    +                   }
    +
    +                   addedJobGraphs.add(jobGraph.getJobId());
    +           }
    +   }
    +
    +   @Override
    +   public void removeJobGraph(JobID jobId) throws Exception {
    +           checkNotNull(jobId, "Job ID");
    +           String path = getPathForJob(jobId);
    +
    +           synchronized (cacheLock) {
    +                   if (addedJobGraphs.contains(jobId)) {
    +                           jobGraphsInZooKeeper.discard(path);
    +
    +                           addedJobGraphs.remove(jobId);
    +                           LOG.info("Removed job graph {} from 
ZooKeeper.", jobId);
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Monitors ZooKeeper for changes.
    +    *
    +    * <p>Detects modifications from other job managers in corner 
situations. The event
    +    * notifications fire for changes from this job manager as well.
    +    */
    +   private final class SubmittedJobGraphsPathCacheListener implements 
PathChildrenCacheListener {
    +
    +           @Override
    +           public void childEvent(CuratorFramework client, 
PathChildrenCacheEvent event)
    +                           throws Exception {
    +
    +                   if (LOG.isDebugEnabled()) {
    +                           if (event.getData() != null) {
    +                                   LOG.debug("Received {} event (path: 
{})", event.getType(), event.getData().getPath());
    +                           }
    +                           else {
    +                                   LOG.debug("Received {} event", 
event.getType());
    +                           }
    +                   }
    +
    +                   synchronized (cacheLock) {
    +                           switch (event.getType()) {
    +                                   case CHILD_ADDED:
    +                                           try {
    +                                                   JobID jobId = 
fromEvent(event);
    +                                                   if (jobGraphListener != 
null && !addedJobGraphs.contains(jobId)) {
    +                                                           try {
    +                                                                   // 
Whoa! This has been added by someone else. Or we were fast
    +                                                                   // to 
remove it (false positive).
    +                                                                   
jobGraphListener.onAddedJobGraph(jobId);
    +                                                           }
    +                                                           catch 
(Throwable t) {
    +                                                                   
LOG.error("Error in callback", t);
    +                                                           }
    +                                                   }
    +                                           }
    +                                           catch (Exception e) {
    +                                                   LOG.error("Error in 
SubmittedJobGraphsPathCacheListener", e);
    +                                           }
    +
    +                                           break;
    +
    +                                   case CHILD_UPDATED:
    +                                           // TODO What to do if someone 
overwrites one of our job graphs? And how
    +                                           // to tell whose it is? Is the 
update-time check enough to prevent these
    +                                           // problematic situations.
    --- End diff --
    
    Hmm should be logged at least.


> Recover running jobs on JobManager failure
> ------------------------------------------
>
>                 Key: FLINK-2354
>                 URL: https://issues.apache.org/jira/browse/FLINK-2354
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager
>    Affects Versions: master
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>             Fix For: 0.10
>
>
> tl;dr Persist JobGraphs in state backend and coordinate reference to state 
> handle via ZooKeeper.
> Problem: When running multiple JobManagers in high availability mode, the 
> leading job manager looses all running jobs when it fails. After a new 
> leading job manager is elected, it is not possible to recover any previously 
> running jobs.
> Solution: The leading job manager, which receives the job graph writes 1) the 
> job graph to a state backend, and 2) a reference to the respective state 
> handle to ZooKeeper. In general, job graphs can become large (multiple MBs, 
> because they include closures etc.). ZooKeeper is not designed for data of 
> this size. The level of indirection via the reference to the state backend 
> keeps the data in ZooKeeper small.
> Proposed ZooKeeper layout:
> /flink (default)
>   +- currentJobs
>        +- job id i
>             +- state handle reference of job graph i
> The 'currentJobs' node needs to be persistent to allow recovery of jobs 
> between job managers. The currentJobs node needs to satisfy the following 
> invariant: There is a reference to a job graph with id i IFF the respective 
> job graph needs to be recovered by a newly elected job manager leader.
> With this in place, jobs will be recovered from their initial state (as if 
> resubmitted). The next step is to backup the runtime state handles of 
> checkpoints in a similar manner.
> ---
> This work will be based on [~trohrm...@apache.org]'s implementation of 
> FLINK-2291. The leader election service notifies the job manager about 
> granted/revoked leadership. This notification happens via Akka and thus 
> serially *per* job manager, but results in eventually consistent state 
> between job managers. For some snapshots of time it is possible to have a new 
> leader granted leadership, before the old one has been revoked its leadership.
> [~trohrm...@apache.org], can you confirm that leadership does not guarantee 
> mutually exclusive access to the shared 'currentJobs' state?
> For example, the following can happen:
> - JM 1 is leader, JM 2 is standby
> - JOB i is running (and hence /flink/currentJobs/i exists)
> - ZK notifies leader election service (LES) of JM 1 and JM 2
> - LES 2 immediately notifies JM 2 about granted leadership, but LES 1 
> notification revoking leadership takes longer
> - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives 
> final JobStatusChange
> - JM 2 resubmits the job /flink/currentJobs/i
> - JM 1 removes /flink/currentJobs/i, because it is now finished
> => inconsistent state (wrt the specified invariant above)
> If it is indeed a problem, we can circumvent this with a Curator recipe for 
> [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to 
> coordinate the access to currentJobs. The lock needs to be acquired on 
> leadership.
> ---
> Minimum required tests:
> - Unit tests for job graph serialization and writing to state backend and 
> ZooKeeper with expected nodes
> - Unit tests for job submission to job manager in leader/non-leader state
> - Unit tests for leadership granting/revoking and job submission/restarting 
> interleavings
> - Process failure integration tests with single and multiple running jobs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to