Repository: flink
Updated Branches:
  refs/heads/release-1.1 59f61bf6c -> 9c058871f


[FLINK-5193] [jm] Harden job recovery in case of recovery failures

When recovering multiple jobs a single recovery failure caused all jobs to be 
not recovered.
This PR changes this behaviour to make the recovery of jobs independent so that 
a single
failure won't stall the complete recovery. Furthermore, this PR improves the 
error reporting
for failures originating in the ZooKeeperSubmittedJobGraphStore.

Add test case

Fix failing JobManagerHACheckpointRecoveryITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d314bc52
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d314bc52
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d314bc52

Branch: refs/heads/release-1.1
Commit: d314bc5235e2573ff77f45d327bc62f521063b71
Parents: 59f61bf
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Nov 29 17:31:08 2016 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Dec 1 17:53:34 2016 +0100

----------------------------------------------------------------------
 .../StandaloneSubmittedJobGraphStore.java       |  11 +-
 .../jobmanager/SubmittedJobGraphStore.java      |  19 ++-
 .../ZooKeeperSubmittedJobGraphStore.java        | 113 +++++++------
 .../zookeeper/ZooKeeperStateHandleStore.java    |  44 ++++-
 .../flink/runtime/jobmanager/JobManager.scala   |  45 ++---
 .../jobmanager/JobManagerHARecoveryTest.java    | 165 ++++++++++++++++++-
 .../StandaloneSubmittedJobGraphStoreTest.java   |  11 +-
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java |  29 ++--
 .../JobManagerHACheckpointRecoveryITCase.java   |   4 +-
 9 files changed, 315 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index db36f92..8267b9b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -20,10 +20,9 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import scala.Option;
 
+import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * {@link SubmittedJobGraph} instances for JobManagers running in {@link 
RecoveryMode#STANDALONE}.
@@ -54,12 +53,12 @@ public class StandaloneSubmittedJobGraphStore implements 
SubmittedJobGraphStore
        }
 
        @Override
-       public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws 
Exception {
-               return Option.empty();
+       public Collection<JobID> getJobIds() throws Exception {
+               return Collections.emptyList();
        }
 
        @Override
-       public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
-               return Collections.emptyList();
+       public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+               return null;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index bd628cd..55c2e79 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -19,10 +19,8 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import scala.Option;
 
-import java.util.List;
+import java.util.Collection;
 
 /**
  * {@link SubmittedJobGraph} instances for recovery.
@@ -40,16 +38,11 @@ public interface SubmittedJobGraphStore {
        void stop() throws Exception;
 
        /**
-        * Returns a list of all submitted {@link JobGraph} instances.
-        */
-       List<SubmittedJobGraph> recoverJobGraphs() throws Exception;
-
-       /**
         * Returns the {@link SubmittedJobGraph} with the given {@link JobID}.
         *
         * <p>An Exception is thrown, if no job graph with the given ID exists.
         */
-       Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception;
+       SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception;
 
        /**
         * Adds the {@link SubmittedJobGraph} instance.
@@ -64,6 +57,14 @@ public interface SubmittedJobGraphStore {
        void removeJobGraph(JobID jobId) throws Exception;
 
        /**
+        * Get all job ids of submitted job graphs to the submitted job graph 
store.
+        *
+        * @return Collection of submitted job ids
+        * @throws Exception if the operation fails
+        */
+       Collection<JobID> getJobIds() throws Exception;
+
+       /**
         * A listener for {@link SubmittedJobGraph} instances. This is used to 
react to races between
         * multiple running {@link SubmittedJobGraphStore} instances (on 
multiple job managers).
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 7324c07..859d319 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -24,18 +24,15 @@ import 
org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -156,73 +153,41 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
        }
 
        @Override
-       public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
+       public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+               checkNotNull(jobId, "Job ID");
+               String path = getPathForJob(jobId);
+
                synchronized (cacheLock) {
                        verifyIsRunning();
 
-                       LOG.debug("Recovering all job graphs from ZooKeeper at 
{}.", zooKeeperFullBasePath);
-
-                       List<Tuple2<StateHandle<SubmittedJobGraph>, String>> 
submitted;
-
-                       while (true) {
-                               try {
-                                       submitted = 
jobGraphsInZooKeeper.getAll();
-                                       break;
-                               }
-                               catch (ConcurrentModificationException e) {
-                                       LOG.warn("Concurrent modification while 
reading from ZooKeeper. Retrying.");
-                               }
-                       }
-
-                       LOG.info("Found {} job graphs.", submitted.size());
-
-                       if (submitted.size() != 0) {
-                               List<SubmittedJobGraph> jobGraphs = new 
ArrayList<>(submitted.size());
-
-                               for (Tuple2<StateHandle<SubmittedJobGraph>, 
String> jobStateHandle : submitted) {
-                                       SubmittedJobGraph jobGraph = 
jobStateHandle
-                                                       
.f0.getState(ClassLoader.getSystemClassLoader());
+                       StateHandle<SubmittedJobGraph> submittedJobStateHandle;
 
-                                       addedJobGraphs.add(jobGraph.getJobId());
 
-                                       jobGraphs.add(jobGraph);
-                               }
-
-                               LOG.info("Recovered {} job graphs: {}.", 
jobGraphs.size(), jobGraphs);
-                               return jobGraphs;
-                       }
-                       else {
-                               LOG.info("No job graph to recover.");
-                               return Collections.emptyList();
+                       try {
+                               submittedJobStateHandle = 
jobGraphsInZooKeeper.get(path);
+                       } catch (KeeperException.NoNodeException ignored) {
+                               return null;
+                       } catch (Exception e) {
+                               throw new Exception("Could not retrieve the 
submitted job graph state handle " +
+                                       "for " + path + "from the submitted job 
graph store.", e);
                        }
-               }
-       }
-
-       @Override
-       public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws 
Exception {
-               checkNotNull(jobId, "Job ID");
-               String path = getPathForJob(jobId);
+                       
+                       SubmittedJobGraph jobGraph;
 
-               LOG.debug("Recovering job graph {} from {}{}.", jobId, 
zooKeeperFullBasePath, path);
+                       LOG.debug("Recovering job graph {} from {}{}.", jobId, 
zooKeeperFullBasePath, path);
 
-               synchronized (cacheLock) {
-                       verifyIsRunning();
 
                        try {
-                               StateHandle<SubmittedJobGraph> jobStateHandle = 
jobGraphsInZooKeeper.get(path);
-
-                               SubmittedJobGraph jobGraph = jobStateHandle
-                                               
.getState(ClassLoader.getSystemClassLoader());
+                               jobGraph = 
submittedJobStateHandle.getState(getClass().getClassLoader());
+                       } catch (Exception e) {
+                               throw new Exception("Failed to retrieve the 
submitted job graph from state handle.", e);
+                       }
 
-                               addedJobGraphs.add(jobGraph.getJobId());
+                       addedJobGraphs.add(jobGraph.getJobId());
 
-                               LOG.info("Recovered {}.", jobGraph);
+                       LOG.info("Recovered {}.", jobGraph);
 
-                               return Option.apply(jobGraph);
-                       }
-                       catch (KeeperException.NoNodeException ignored) {
-                               return Option.empty();
-                       }
+                       return jobGraph;
                }
        }
 
@@ -290,6 +255,29 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
                LOG.info("Removed job graph {} from ZooKeeper.", jobId);
        }
 
+       @Override
+       public Collection<JobID> getJobIds() throws Exception {
+               Collection<String> paths;
+
+               try {
+                       paths = jobGraphsInZooKeeper.getAllPaths();
+               } catch (Exception e) {
+                       throw new Exception("Failed to retrieve entry paths 
from ZooKeeperStateHandleStore.", e);
+               }
+
+               List<JobID> jobIds = new ArrayList<>(paths.size());
+
+               for (String path : paths) {
+                       try {
+                               jobIds.add(jobIdfromPath(path));
+                       } catch (Exception exception) {
+                               LOG.warn("Could not parse job id from {}.", 
path, exception);
+                       }
+               }
+
+               return jobIds;
+       }
+
        /**
         * Monitors ZooKeeper for changes.
         *
@@ -412,4 +400,13 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
                return String.format("/%s", jobId);
        }
 
+       /**
+        * Returns the JobID from the given path in ZooKeeper.
+        *
+        * @param path in ZooKeeper
+        * @return JobID associated with the given path
+        */
+       public static JobID jobIdfromPath(final String path) {
+               return JobID.fromHexString(path);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 6576ff8..0d63a15 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -30,8 +30,11 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -226,10 +229,45 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
        public StateHandle<T> get(String pathInZooKeeper) throws Exception {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
-               byte[] data = client.getData().forPath(pathInZooKeeper);
+               byte[] data;
 
-               return (StateHandle<T>) InstantiationUtil
-                               .deserializeObject(data, 
ClassLoader.getSystemClassLoader());
+               try {
+                       data = client.getData().forPath(pathInZooKeeper);
+               } catch (Exception e) {
+                       throw new Exception("Failed to retrieve state handle 
data under " + pathInZooKeeper +
+                               " from ZooKeeper.", e);
+               }
+
+               try {
+                       return InstantiationUtil.deserializeObject(data, 
Thread.currentThread().getContextClassLoader());
+               } catch (IOException | ClassNotFoundException e) {
+                       throw new Exception("Failed to deserialize state handle 
from ZooKeeper data from " +
+                               pathInZooKeeper + '.', e);
+               }
+       }
+
+       /**
+        * Return a list of all valid paths for state handles.
+        *
+        * @return List of valid state handle paths in ZooKeeper
+        * @throws Exception if a ZooKeeper operation fails
+        */
+       public Collection<String> getAllPaths() throws Exception {
+               final String path = "/";
+
+               while(true) {
+                       Stat stat = client.checkExists().forPath(path);
+
+                       if (stat == null) {
+                               return Collections.emptyList();
+                       } else {
+                               try {
+                                       return 
client.getChildren().forPath(path);
+                               } catch (KeeperException.NoNodeException 
ignored) {
+                                       // Concurrent deletion, retry
+                               }
+                       }
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9061db4..9f6e2db 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -471,6 +471,8 @@ class JobManager(
 
     case RecoverSubmittedJob(submittedJobGraph) =>
       if (!currentJobs.contains(submittedJobGraph.getJobId)) {
+        log.info(s"Submitting recovered job ${submittedJobGraph.getJobId}.")
+
         submitJob(
           submittedJobGraph.getJobGraph(),
           submittedJobGraph.getJobInfo(),
@@ -492,7 +494,7 @@ class JobManager(
             log.info(s"Attempting to recover job $jobId.")
             val submittedJobGraphOption = 
submittedJobGraphs.recoverJobGraph(jobId)
 
-            submittedJobGraphOption match {
+            Option(submittedJobGraphOption) match {
               case Some(submittedJobGraph) =>
                 if (!leaderElectionService.hasLeadership()) {
                   // we've lost leadership. mission: abort.
@@ -505,37 +507,31 @@ class JobManager(
             }
           }
         } catch {
-          case t: Throwable => log.error(s"Failed to recover job $jobId.", t)
+          case t: Throwable => log.warn(s"Failed to recover job $jobId.", t)
         }
       }(context.dispatcher)
 
     case RecoverAllJobs =>
       future {
-        try {
-          // The ActorRef, which is part of the submitted job graph can only be
-          // de-serialized in the scope of an actor system.
-          akka.serialization.JavaSerializer.currentSystem.withValue(
-            context.system.asInstanceOf[ExtendedActorSystem]) {
+        log.info("Attempting to recover all jobs.")
 
-            log.info(s"Attempting to recover all jobs.")
-
-            val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala
+        try {
+          val jobIdsToRecover = submittedJobGraphs.getJobIds().asScala
 
-            if (!leaderElectionService.hasLeadership()) {
-              // we've lost leadership. mission: abort.
-              log.warn(s"Lost leadership during recovery. Aborting recovery of 
${jobGraphs.size} " +
-                s"jobs.")
-            } else {
-              log.info(s"Re-submitting ${jobGraphs.size} job graphs.")
+          if (jobIdsToRecover.isEmpty) {
+            log.info("There are no jobs to recover.")
+          } else {
+            log.info(s"There are ${jobIdsToRecover.size} jobs to recover. 
Starting the job " +
+                       s"recovery.")
 
-              jobGraphs.foreach{
-                submittedJobGraph =>
-                  self ! 
decorateMessage(RecoverSubmittedJob(submittedJobGraph))
-              }
+            jobIdsToRecover foreach {
+              jobId => self ! decorateMessage(RecoverJob(jobId))
             }
           }
         } catch {
-          case t: Throwable => log.error("Fatal error: Failed to recover 
jobs.", t)
+          case e: Exception =>
+            log.warn("Failed to recover job ids from submitted job graph 
store. Aborting " +
+                       "recovery.", e)
         }
       }(context.dispatcher)
 
@@ -1039,7 +1035,12 @@ class JobManager(
    * @param jobInfo the job info
    * @param isRecovery Flag indicating whether this is a recovery or initial 
submission
    */
-  private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: 
Boolean = false): Unit = {
+  private def submitJob(
+      jobGraph: JobGraph,
+      jobInfo: JobInfo,
+      isRecovery: Boolean = false)
+    : Unit = {
+
     if (jobGraph == null) {
       jobInfo.client ! decorateMessage(JobResultFailure(
         new SerializedThrowable(

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index b98f338..b78f1fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -20,8 +20,13 @@ package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.japi.pf.FI;
+import akka.japi.pf.ReceiveBuilder;
+import akka.pattern.Patterns;
+import akka.testkit.CallingThreadDispatcher;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
@@ -29,15 +34,18 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceManager;
@@ -49,10 +57,12 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -62,6 +72,7 @@ import org.apache.flink.runtime.testingUtils.TestingMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -69,25 +80,35 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import scala.Int;
 import scala.Option;
+import scala.PartialFunction;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
+import scala.runtime.BoxedUnit;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class JobManagerHARecoveryTest {
 
@@ -288,6 +309,134 @@ public class JobManagerHARecoveryTest {
        }
 
        /**
+        * Tests that a failing job recovery won't cause other job recoveries 
to fail.
+        */
+       @Test
+       public void testFailingJobRecovery() throws Exception {
+               final FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
+               final FiniteDuration jobRecoveryTimeout = new FiniteDuration(0, 
TimeUnit.SECONDS);
+               Deadline deadline = new FiniteDuration(1, 
TimeUnit.MINUTES).fromNow();
+               final Configuration flinkConfiguration = new Configuration();
+               UUID leaderSessionID = UUID.randomUUID();
+               ActorRef jobManager = null;
+               JobID jobId1 = new JobID();
+               JobID jobId2 = new JobID();
+
+               // set HA mode to zookeeper so that we try to recover jobs
+               flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
+
+               try {
+                       final SubmittedJobGraphStore submittedJobGraphStore = 
mock(SubmittedJobGraphStore.class);
+
+                       SubmittedJobGraph submittedJobGraph = 
mock(SubmittedJobGraph.class);
+                       when(submittedJobGraph.getJobId()).thenReturn(jobId2);
+
+                       
when(submittedJobGraphStore.getJobIds()).thenReturn(Arrays.asList(jobId1, 
jobId2));
+
+                       // fail the first job recovery
+                       
when(submittedJobGraphStore.recoverJobGraph(eq(jobId1))).thenThrow(new 
Exception("Test exception"));
+                       // succeed the second job recovery
+                       
when(submittedJobGraphStore.recoverJobGraph(eq(jobId2))).thenReturn(submittedJobGraph);
+
+                       final TestingLeaderElectionService 
myLeaderElectionService = new TestingLeaderElectionService();
+
+                       final Collection<JobID> recoveredJobs = new 
ArrayList<>(2);
+
+                       Props jobManagerProps = Props.create(
+                               TestingFailingHAJobManager.class,
+                               flinkConfiguration,
+                               TestExecutors.directExecutor(),
+                               TestExecutors.directExecutor(),
+                               mock(InstanceManager.class),
+                               mock(Scheduler.class),
+                               new 
BlobLibraryCacheManager(mock(BlobService.class), 1 << 20),
+                               ActorRef.noSender(),
+                               new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
+                               timeout,
+                               myLeaderElectionService,
+                               submittedJobGraphStore,
+                               mock(CheckpointRecoveryFactory.class),
+                               mock(SavepointStore.class),
+                               jobRecoveryTimeout,
+                               Option.<MetricRegistry>apply(null),
+                               
recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
+
+                       jobManager = system.actorOf(jobManagerProps, 
"jobmanager");
+
+                       Future<Object> started = Patterns.ask(jobManager, new 
Identify(42), deadline.timeLeft().toMillis());
+
+                       Await.ready(started, deadline.timeLeft());
+
+                       // make the job manager the leader --> this triggers 
the recovery of all jobs
+                       myLeaderElectionService.isLeader(leaderSessionID);
+
+                       // check that we have successfully recovered the second 
job
+                       assertThat(recoveredJobs, containsInAnyOrder(jobId2));
+               } finally {
+                       TestingUtils.stopActor(jobManager);
+               }
+       }
+
+       static class TestingFailingHAJobManager extends JobManager {
+
+               private final Collection<JobID> recoveredJobs;
+
+               public TestingFailingHAJobManager(
+                       Configuration flinkConfiguration,
+                       Executor futureExecutor,
+                       Executor ioExecutor,
+                       InstanceManager instanceManager,
+                       Scheduler scheduler,
+                       BlobLibraryCacheManager libraryCacheManager,
+                       ActorRef archive,
+                       RestartStrategyFactory restartStrategyFactory,
+                       FiniteDuration timeout,
+                       LeaderElectionService leaderElectionService,
+                       SubmittedJobGraphStore submittedJobGraphs,
+                       CheckpointRecoveryFactory checkpointRecoveryFactory,
+                       SavepointStore savepointStore,
+                       FiniteDuration jobRecoveryTimeout,
+                       Option<MetricRegistry> metricRegistry,
+                       Collection<JobID> recoveredJobs) {
+                       super(
+                               flinkConfiguration,
+                               futureExecutor,
+                               ioExecutor,
+                               instanceManager,
+                               scheduler,
+                               libraryCacheManager,
+                               archive,
+                               restartStrategyFactory,
+                               timeout,
+                               leaderElectionService,
+                               submittedJobGraphs,
+                               checkpointRecoveryFactory,
+                               savepointStore,
+                               jobRecoveryTimeout,
+                               metricRegistry);
+
+                       this.recoveredJobs = recoveredJobs;
+               }
+
+               @Override
+               public PartialFunction<Object, BoxedUnit> handleMessage() {
+                       return ReceiveBuilder.match(
+                               JobManagerMessages.RecoverSubmittedJob.class,
+                               new 
FI.UnitApply<JobManagerMessages.RecoverSubmittedJob>() {
+                                       @Override
+                                       public void 
apply(JobManagerMessages.RecoverSubmittedJob submitJob) throws Exception {
+                                               
recoveredJobs.add(submitJob.submittedJobGraph().getJobId());
+                                       }
+                               }).matchAny(new FI.UnitApply<Object>() {
+                               @Override
+                               public void apply(Object o) throws Exception {
+                                       
TestingFailingHAJobManager.super.handleMessage().apply(o);
+                               }
+                       }).build();
+               }
+       }
+
+       /**
         * A checkpoint store, which supports shutdown and suspend. You can use 
this to test HA
         * as long as the factory always returns the same store instance.
         */
@@ -383,16 +532,11 @@ public class JobManagerHARecoveryTest {
                }
 
                @Override
-               public List<SubmittedJobGraph> recoverJobGraphs() throws 
Exception {
-                       return new ArrayList<>(storedJobs.values());
-               }
-
-               @Override
-               public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) 
throws Exception {
+               public SubmittedJobGraph recoverJobGraph(JobID jobId) throws 
Exception {
                        if (storedJobs.containsKey(jobId)) {
-                               return Option.apply(storedJobs.get(jobId));
+                               return storedJobs.get(jobId);
                        } else {
-                               return Option.apply(null);
+                               return null;
                        }
                }
 
@@ -406,6 +550,11 @@ public class JobManagerHARecoveryTest {
                        storedJobs.remove(jobId);
                }
 
+               @Override
+               public Collection<JobID> getJobIds() throws Exception {
+                       return storedJobs.keySet();
+               }
+
                boolean contains(JobID jobId) {
                        return storedJobs.containsKey(jobId);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
index 8ebb7f8..079a10e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
@@ -19,14 +19,13 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
 
 public class StandaloneSubmittedJobGraphStoreTest {
 
@@ -41,14 +40,14 @@ public class StandaloneSubmittedJobGraphStoreTest {
                                new JobGraph("testNoOps"),
                                new JobInfo(ActorRef.noSender(), 
ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE));
 
-               assertEquals(0, jobGraphs.recoverJobGraphs().size());
+               assertEquals(0, jobGraphs.getJobIds().size());
 
                jobGraphs.putJobGraph(jobGraph);
-               assertEquals(0, jobGraphs.recoverJobGraphs().size());
+               assertEquals(0, jobGraphs.getJobIds().size());
 
                jobGraphs.removeJobGraph(jobGraph.getJobGraph().getJobID());
-               assertEquals(0, jobGraphs.recoverJobGraphs().size());
+               assertEquals(0, jobGraphs.getJobIds().size());
 
-               assertTrue(jobGraphs.recoverJobGraph(new JobID()).isEmpty());
+               assertNull(jobGraphs.recoverJobGraph(new JobID()));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index 8eaecd0..cc9e815 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -36,8 +36,8 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
 import static org.junit.Assert.assertEquals;
@@ -93,32 +93,36 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends 
TestLogger {
                        SubmittedJobGraph jobGraph = 
createSubmittedJobGraph(new JobID(), 0);
 
                        // Empty state
-                       assertEquals(0, jobGraphs.recoverJobGraphs().size());
+                       assertEquals(0, jobGraphs.getJobIds().size());
 
                        // Add initial
                        jobGraphs.putJobGraph(jobGraph);
 
                        // Verify initial job graph
-                       List<SubmittedJobGraph> actual = 
jobGraphs.recoverJobGraphs();
-                       assertEquals(1, actual.size());
+                       Collection<JobID> jobIds = jobGraphs.getJobIds();
+                       assertEquals(1, jobIds.size());
 
-                       verifyJobGraphs(jobGraph, actual.get(0));
+                       JobID jobId = jobIds.iterator().next();
+
+                       verifyJobGraphs(jobGraph, 
jobGraphs.recoverJobGraph(jobId));
 
                        // Update (same ID)
                        jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 
1);
                        jobGraphs.putJobGraph(jobGraph);
 
                        // Verify updated
-                       actual = jobGraphs.recoverJobGraphs();
-                       assertEquals(1, actual.size());
+                       jobIds = jobGraphs.getJobIds();
+                       assertEquals(1, jobIds.size());
+
+                       jobId = jobIds.iterator().next();
 
-                       verifyJobGraphs(jobGraph, actual.get(0));
+                       verifyJobGraphs(jobGraph, 
jobGraphs.recoverJobGraph(jobId));
 
                        // Remove
                        jobGraphs.removeJobGraph(jobGraph.getJobId());
 
                        // Empty state
-                       assertEquals(0, jobGraphs.recoverJobGraphs().size());
+                       assertEquals(0, jobGraphs.getJobIds().size());
 
                        // Nothing should have been notified
                        verify(listener, 
atMost(1)).onAddedJobGraph(any(JobID.class));
@@ -154,11 +158,12 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase 
extends TestLogger {
                                jobGraphs.putJobGraph(jobGraph);
                        }
 
-                       List<SubmittedJobGraph> actual = 
jobGraphs.recoverJobGraphs();
+                       Collection<JobID> actual = jobGraphs.getJobIds();
 
                        assertEquals(expected.size(), actual.size());
 
-                       for (SubmittedJobGraph jobGraph : actual) {
+                       for (JobID jobId : actual) {
+                               SubmittedJobGraph jobGraph = 
jobGraphs.recoverJobGraph(jobId);
                                
assertTrue(expected.containsKey(jobGraph.getJobId()));
 
                                
verifyJobGraphs(expected.get(jobGraph.getJobId()), jobGraph);
@@ -167,7 +172,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends 
TestLogger {
                        }
 
                        // Empty state
-                       assertEquals(0, jobGraphs.recoverJobGraphs().size());
+                       assertEquals(0, jobGraphs.getJobIds().size());
 
                        // Nothing should have been notified
                        verify(listener, 
atMost(expected.size())).onAddedJobGraph(any(JobID.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 262f78a..e598ac5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -364,7 +364,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                                nonLeadingJobManagerProcess = 
jobManagerProcess[0];
                        }
 
-                       // BLocking JobGraph
+                       // Blocking JobGraph
                        JobVertex blockingVertex = new JobVertex("Blocking 
vertex");
                        
blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
                        JobGraph jobGraph = new JobGraph(blockingVertex);
@@ -393,7 +393,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                                String output = 
nonLeadingJobManagerProcess.getProcessOutput();
 
                                if (output != null) {
-                                       if (output.contains("Fatal error: 
Failed to recover jobs") &&
+                                       if (output.contains("Failed to recover 
job") &&
                                                        
output.contains("java.io.FileNotFoundException")) {
 
                                                success = true;

Reply via email to