http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
new file mode 100644
index 0000000..660f8bc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link 
RecoveryMode#ZOOKEEPER}.
+ *
+ * <p>Each job graph creates ZNode:
+ * <pre>
+ * +----O /flink/jobgraphs/&lt;job-id&gt; 1 [persistent]
+ * .
+ * .
+ * .
+ * +----O /flink/jobgraphs/&lt;job-id&gt; N [persistent]
+ * </pre>
+ *
+ * <p>The root path is watched to detect concurrent modifications in corner 
situations where
+ * multiple instances operate concurrently. The job manager acts as a {@link 
SubmittedJobGraphListener}
+ * to react to such situations.
+ */
+public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore 
{
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphStore.class);
+
+       /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
+       private final Object cacheLock = new Object();
+
+       /** Client (not a namespace facade) */
+       private final CuratorFramework client;
+
+       /** The set of IDs of all added job graphs. */
+       private final Set<JobID> addedJobGraphs = new HashSet<>();
+
+       /** Completed checkpoints in ZooKeeper */
+       private final ZooKeeperStateHandleStore<SubmittedJobGraph> 
jobGraphsInZooKeeper;
+
+       /**
+        * Cache to monitor all children. This is used to detect races with 
other instances working
+        * on the same state.
+        */
+       private final PathChildrenCache pathCache;
+
+       /** The external listener to be notified on races. */
+       private SubmittedJobGraphListener jobGraphListener;
+
+       /** Flag indicating whether this instance is running. */
+       private boolean isRunning;
+
+       public ZooKeeperSubmittedJobGraphStore(
+                       CuratorFramework client,
+                       String currentJobsPath,
+                       StateHandleProvider<SubmittedJobGraph> 
stateHandleProvider) throws Exception {
+
+               checkNotNull(currentJobsPath, "Current jobs path");
+               checkNotNull(stateHandleProvider, "State handle provider");
+
+               // Keep a reference to the original client and not the 
namespace facade. The namespace
+               // facade cannot be closed.
+               this.client = checkNotNull(client, "Curator client");
+
+               // Ensure that the job graphs path exists
+               client.newNamespaceAwareEnsurePath(currentJobsPath)
+                               .ensure(client.getZookeeperClient());
+
+               // All operations will have the path as root
+               client = client.usingNamespace(client.getNamespace() + 
currentJobsPath);
+
+               this.jobGraphsInZooKeeper = new 
ZooKeeperStateHandleStore<>(client, stateHandleProvider);
+
+               this.pathCache = new PathChildrenCache(client, "/", false);
+               pathCache.getListenable().addListener(new 
SubmittedJobGraphsPathCacheListener());
+       }
+
+       @Override
+       public void start(SubmittedJobGraphListener jobGraphListener) throws 
Exception {
+               synchronized (cacheLock) {
+                       if (!isRunning) {
+                               this.jobGraphListener = jobGraphListener;
+
+                               pathCache.start();
+
+                               isRunning = true;
+                       }
+               }
+       }
+
+       @Override
+       public void stop() throws Exception {
+               synchronized (cacheLock) {
+                       if (isRunning) {
+                               jobGraphListener = null;
+
+                               pathCache.close();
+
+                               client.close();
+
+                               isRunning = false;
+                       }
+               }
+       }
+
+       @Override
+       public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
+               synchronized (cacheLock) {
+                       verifyIsRunning();
+
+                       List<Tuple2<StateHandle<SubmittedJobGraph>, String>> 
submitted;
+
+                       while (true) {
+                               try {
+                                       submitted = 
jobGraphsInZooKeeper.getAll();
+                                       break;
+                               }
+                               catch (ConcurrentModificationException e) {
+                                       LOG.warn("Concurrent modification while 
reading from ZooKeeper. Retrying.");
+                               }
+                       }
+
+                       if (submitted.size() != 0) {
+                               List<SubmittedJobGraph> jobGraphs = new 
ArrayList<>(submitted.size());
+
+                               for (Tuple2<StateHandle<SubmittedJobGraph>, 
String> jobStateHandle : submitted) {
+                                       SubmittedJobGraph jobGraph = 
jobStateHandle
+                                                       
.f0.getState(ClassLoader.getSystemClassLoader());
+
+                                       addedJobGraphs.add(jobGraph.getJobId());
+
+                                       jobGraphs.add(jobGraph);
+                               }
+
+                               LOG.info("Recovered {} job graphs: {}.", 
jobGraphs.size(), jobGraphs);
+                               return jobGraphs;
+                       }
+                       else {
+                               LOG.info("No job graph to recover.");
+                               return Collections.emptyList();
+                       }
+               }
+       }
+
+       @Override
+       public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws 
Exception {
+               checkNotNull(jobId, "Job ID");
+               String path = getPathForJob(jobId);
+
+               synchronized (cacheLock) {
+                       verifyIsRunning();
+
+                       try {
+                               StateHandle<SubmittedJobGraph> jobStateHandle = 
jobGraphsInZooKeeper.get(path);
+
+                               SubmittedJobGraph jobGraph = jobStateHandle
+                                               
.getState(ClassLoader.getSystemClassLoader());
+
+                               addedJobGraphs.add(jobGraph.getJobId());
+
+                               LOG.info("Recovered {}.", jobGraph);
+
+                               return Option.apply(jobGraph);
+                       }
+                       catch (KeeperException.NoNodeException ignored) {
+                               return Option.empty();
+                       }
+               }
+       }
+
+       @Override
+       public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+               checkNotNull(jobGraph, "Job graph");
+               String path = getPathForJob(jobGraph.getJobId());
+
+               boolean success = false;
+
+               while (!success) {
+                       synchronized (cacheLock) {
+                               verifyIsRunning();
+
+                               int currentVersion = 
jobGraphsInZooKeeper.exists(path);
+
+                               if (currentVersion == -1) {
+                                       try {
+                                               jobGraphsInZooKeeper.add(path, 
jobGraph);
+
+                                               
addedJobGraphs.add(jobGraph.getJobId());
+
+                                               LOG.info("Added {} to 
ZooKeeper.", jobGraph);
+
+                                               success = true;
+                                       }
+                                       catch 
(KeeperException.NodeExistsException ignored) {
+                                       }
+                               }
+                               else if 
(addedJobGraphs.contains(jobGraph.getJobId())) {
+                                       try {
+                                               
jobGraphsInZooKeeper.replace(path, currentVersion, jobGraph);
+                                               LOG.info("Updated {} in 
ZooKeeper.", jobGraph);
+
+                                               success = true;
+                                       }
+                                       catch (KeeperException.NoNodeException 
ignored) {
+                                       }
+                               }
+                               else {
+                                       throw new IllegalStateException("Oh, 
no. Trying to update a graph you didn't " +
+                                                       
"#getAllSubmittedJobGraphs() or #putJobGraph() yourself before.");
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void removeJobGraph(JobID jobId) throws Exception {
+               checkNotNull(jobId, "Job ID");
+               String path = getPathForJob(jobId);
+
+               synchronized (cacheLock) {
+                       if (addedJobGraphs.contains(jobId)) {
+                               
jobGraphsInZooKeeper.removeAndDiscardState(path);
+
+                               addedJobGraphs.remove(jobId);
+                               LOG.info("Removed job graph {} from 
ZooKeeper.", jobId);
+                       }
+               }
+       }
+
+       /**
+        * Monitors ZooKeeper for changes.
+        *
+        * <p>Detects modifications from other job managers in corner 
situations. The event
+        * notifications fire for changes from this job manager as well.
+        */
+       private final class SubmittedJobGraphsPathCacheListener implements 
PathChildrenCacheListener {
+
+               @Override
+               public void childEvent(CuratorFramework client, 
PathChildrenCacheEvent event)
+                               throws Exception {
+
+                       if (LOG.isDebugEnabled()) {
+                               if (event.getData() != null) {
+                                       LOG.debug("Received {} event (path: 
{})", event.getType(), event.getData().getPath());
+                               }
+                               else {
+                                       LOG.debug("Received {} event", 
event.getType());
+                               }
+                       }
+
+                       switch (event.getType()) {
+                               case CHILD_ADDED:
+                                       synchronized (cacheLock) {
+                                               try {
+                                                       JobID jobId = 
fromEvent(event);
+                                                       if (jobGraphListener != 
null && !addedJobGraphs.contains(jobId)) {
+                                                               try {
+                                                                       // 
Whoa! This has been added by someone else. Or we were fast
+                                                                       // to 
remove it (false positive).
+                                                                       
jobGraphListener.onAddedJobGraph(jobId);
+                                                               }
+                                                               catch 
(Throwable t) {
+                                                                       
LOG.error("Error in callback", t);
+                                                               }
+                                                       }
+                                               }
+                                               catch (Exception e) {
+                                                       LOG.error("Error in 
SubmittedJobGraphsPathCacheListener", e);
+                                               }
+                                       }
+
+                                       break;
+
+                               case CHILD_UPDATED:
+                                       // Nothing to do
+                                       break;
+
+                               case CHILD_REMOVED:
+                                       synchronized (cacheLock) {
+                                               try {
+                                                       JobID jobId = 
fromEvent(event);
+                                                       if (jobGraphListener != 
null && addedJobGraphs.contains(jobId)) {
+                                                               try {
+                                                                       // Oh 
oh. Someone else removed one of our job graphs. Mean!
+                                                                       
jobGraphListener.onRemovedJobGraph(jobId);
+                                                               }
+                                                               catch 
(Throwable t) {
+                                                                       
LOG.error("Error in callback", t);
+                                                               }
+                                                       }
+
+                                                       break;
+                                               }
+                                               catch (Exception e) {
+                                                       LOG.error("Error in 
SubmittedJobGraphsPathCacheListener", e);
+                                               }
+                                       }
+                                       break;
+
+                               case CONNECTION_SUSPENDED:
+                                       LOG.warn("ZooKeeper connection 
SUSPENDED. Changes to the submitted job " +
+                                                       "graphs are not 
monitored (temporarily).");
+
+                               case CONNECTION_LOST:
+                                       LOG.warn("ZooKeeper connection LOST. 
Changes to the submitted job " +
+                                                       "graphs are not 
monitored (permanently).");
+                                       break;
+
+                               case CONNECTION_RECONNECTED:
+                                       LOG.info("ZooKeeper connection 
RECONNECTED. Changes to the submitted job " +
+                                                       "graphs are monitored 
again.");
+
+                               case INITIALIZED:
+                                       
LOG.info("SubmittedJobGraphsPathCacheListener initialized");
+                                       break;
+                       }
+               }
+
+               /**
+                * Returns a JobID for the event's path.
+                */
+               private JobID fromEvent(PathChildrenCacheEvent event) {
+                       return 
JobID.fromHexString(ZKPaths.getNodeFromPath(event.getData().getPath()));
+               }
+       }
+
+       /**
+        * Verifies that the state is running.
+        */
+       private void verifyIsRunning() {
+               checkState(isRunning, "Not running. Forgot to call start()?");
+       }
+
+       /**
+        * Returns the JobID as a String (with leading slash).
+        */
+       public static String getPathForJob(JobID jobId) {
+               checkNotNull(jobId, "Job ID");
+               return String.format("/%s", jobId);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
index b6223ee..6cba141 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
@@ -67,4 +67,5 @@ public interface LeaderElectionService {
         * @return true if the associated {@link LeaderContender} is the 
leader, otherwise false
         */
        boolean hasLeadership();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index ae3f0e6..811037c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -43,6 +43,7 @@ import java.util.UUID;
  * ZooKeeper as well.
  */
 public class ZooKeeperLeaderElectionService implements LeaderElectionService, 
LeaderLatchListener, NodeCacheListener {
+
        private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperLeaderElectionService.class);
 
        /** Client to the ZooKeeper quorum */

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
new file mode 100644
index 0000000..7aa1ccf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
+public enum StateBackend {
+       JOBMANAGER, FILESYSTEM;
+
+       /**
+        * Returns the configured {@link StateBackend}.
+        *
+        * @param config The config to parse
+        * @return Configured state backend or {@link 
ConfigConstants#DEFAULT_RECOVERY_MODE} if not
+        * configured.
+        */
+       public static StateBackend fromConfig(Configuration config) {
+               return StateBackend.valueOf(config.getString(
+                               ConfigConstants.STATE_BACKEND,
+                               
ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
new file mode 100644
index 0000000..0086ac6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import java.io.Serializable;
+
+/**
+ * State handler provider factory.
+ *
+ * <p>This is going to be superseded soon.
+ */
+public class StateHandleProviderFactory {
+
+       /**
+        * Creates a {@link 
org.apache.flink.runtime.state.FileStateHandle.FileStateHandleProvider} at
+        * the configured recovery path.
+        */
+       public static <T extends Serializable> StateHandleProvider<T> 
createRecoveryFileStateHandleProvider(
+                       Configuration config) {
+
+               StateBackend stateBackend = StateBackend.fromConfig(config);
+
+               if (stateBackend == StateBackend.FILESYSTEM) {
+                       String recoveryPath = config.getString(
+                                       
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
+
+                       if (recoveryPath.equals("")) {
+                               throw new 
IllegalConfigurationException("Missing recovery path. Specify via " +
+                                               "configuration key '" + 
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'.");
+                       }
+                       else {
+                               return 
FileStateHandle.createProvider(recoveryPath);
+                       }
+               }
+               else {
+                       throw new IllegalConfigurationException("Unexpected 
state backend configuration " +
+                                       stateBackend);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java
deleted file mode 100644
index 5f867a5..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
-
-/**
- * Utility class to help working with {@link LeaderElectionService} class.
- */
-public final class LeaderElectionUtils {
-
-       /**
-        * Creates a {@link LeaderElectionService} based on the provided {@link 
Configuration} object.
-        *
-        * @param configuration Configuration object
-        * @return {@link LeaderElectionService} which was created based on the 
provided Configuration
-        * @throws Exception
-        */
-       public static LeaderElectionService 
createLeaderElectionService(Configuration configuration) throws Exception {
-               RecoveryMode recoveryMode = 
RecoveryMode.valueOf(configuration.getString(
-                               ConfigConstants.RECOVERY_MODE,
-                               
ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase()
-               );
-
-               LeaderElectionService leaderElectionService;
-
-               switch(recoveryMode) {
-                       case STANDALONE:
-                               leaderElectionService = new 
StandaloneLeaderElectionService();
-                               break;
-                       case ZOOKEEPER:
-                               leaderElectionService = 
ZooKeeperUtils.createLeaderElectionService(configuration);
-                               break;
-                       default:
-                               throw new Exception("Unknown RecoveryMode " + 
recoveryMode);
-               }
-
-               return leaderElectionService;
-       }
-
-       /**
-        * Private constructor to prevent instantiation.
-        */
-       private LeaderElectionUtils() {
-               throw new RuntimeException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index d2d3945..79b9b7e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -21,19 +21,27 @@ package org.apache.flink.runtime.util;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
 import 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.state.StateHandleProviderFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * Utility class to work with Apache Zookeeper for Flink runtime.
- */
-public final class ZooKeeperUtils {
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class ZooKeeperUtils {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperUtils.class);
 
@@ -47,8 +55,10 @@ public final class ZooKeeperUtils {
        public static CuratorFramework startCuratorFramework(Configuration 
configuration) {
                String zkQuorum = 
configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
 
-               if(zkQuorum == null || zkQuorum.equals("")) {
-                       throw new RuntimeException("No valid ZooKeeper quorum 
has been specified.");
+               if (zkQuorum == null || zkQuorum.equals("")) {
+                       throw new RuntimeException("No valid ZooKeeper quorum 
has been specified. " +
+                                       "You can specify the quorum via the 
configuration key '" +
+                                       ConfigConstants.ZOOKEEPER_QUORUM_KEY + 
"'.");
                }
 
                int sessionTimeout = configuration.getInteger(
@@ -59,7 +69,7 @@ public final class ZooKeeperUtils {
                                ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
                                
ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT);
 
-               int retryWait = configuration.getInteger (
+               int retryWait = configuration.getInteger(
                                ConfigConstants.ZOOKEEPER_RETRY_WAIT,
                                ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
 
@@ -88,14 +98,10 @@ public final class ZooKeeperUtils {
        }
 
        /**
-        * Returns whether high availability is enabled (<=> ZooKeeper quorum 
configured).
+        * Returns whether {@link RecoveryMode#ZOOKEEPER} is configured.
         */
-       public static boolean isZooKeeperHighAvailabilityEnabled(Configuration 
flinkConf) {
-               String recoveryMode = flinkConf.getString(
-                               ConfigConstants.RECOVERY_MODE,
-                               
ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase();
-
-               return recoveryMode.equals(RecoveryMode.ZOOKEEPER.name());
+       public static boolean isZooKeeperRecoveryMode(Configuration flinkConf) {
+               return 
RecoveryMode.fromConfig(flinkConf).equals(RecoveryMode.ZOOKEEPER);
        }
 
        /**
@@ -125,7 +131,7 @@ public final class ZooKeeperUtils {
         * @throws Exception
         */
        public static ZooKeeperLeaderRetrievalService 
createLeaderRetrievalService(
-                       Configuration configuration) throws Exception{
+                       Configuration configuration) throws Exception {
                CuratorFramework client = startCuratorFramework(configuration);
                String leaderPath = 
configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
                                ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
@@ -134,7 +140,8 @@ public final class ZooKeeperUtils {
        }
 
        /**
-        * Creates a {@link ZooKeeperLeaderElectionService} instance.
+        * Creates a {@link ZooKeeperLeaderElectionService} instance and a new 
{@link
+        * CuratorFramework} client.
         *
         * @param configuration {@link Configuration} object containing the 
configuration values
         * @return {@link ZooKeeperLeaderElectionService} instance.
@@ -142,8 +149,24 @@ public final class ZooKeeperUtils {
         */
        public static ZooKeeperLeaderElectionService 
createLeaderElectionService(
                        Configuration configuration) throws Exception {
+
                CuratorFramework client = startCuratorFramework(configuration);
 
+               return createLeaderElectionService(client, configuration);
+       }
+
+       /**
+        * Creates a {@link ZooKeeperLeaderElectionService} instance.
+        *
+        * @param client        The {@link CuratorFramework} ZooKeeper client 
to use
+        * @param configuration {@link Configuration} object containing the 
configuration values
+        * @return {@link ZooKeeperLeaderElectionService} instance.
+        * @throws Exception
+        */
+       public static ZooKeeperLeaderElectionService 
createLeaderElectionService(
+                       CuratorFramework client,
+                       Configuration configuration) throws Exception {
+
                String latchPath = 
configuration.getString(ConfigConstants.ZOOKEEPER_LATCH_PATH,
                                ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
                String leaderPath = 
configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
@@ -153,6 +176,89 @@ public final class ZooKeeperUtils {
        }
 
        /**
+        * Creates a {@link ZooKeeperSubmittedJobGraphStore} instance.
+        *
+        * @param client        The {@link CuratorFramework} ZooKeeper client 
to use
+        * @param configuration {@link Configuration} object
+        * @return {@link ZooKeeperSubmittedJobGraphStore} instance
+        */
+       public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
+                       CuratorFramework client,
+                       Configuration configuration) throws Exception {
+
+               checkNotNull(configuration, "Configuration");
+
+               StateHandleProvider<SubmittedJobGraph> stateHandleProvider =
+                               
StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration);
+
+               // ZooKeeper submitted jobs root dir
+               String zooKeeperSubmittedJobsPath = configuration.getString(
+                               ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+                               
ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+
+               return new ZooKeeperSubmittedJobGraphStore(
+                               client, zooKeeperSubmittedJobsPath, 
stateHandleProvider);
+       }
+
+       /**
+        * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
+        *
+        * @param client                         The {@link CuratorFramework} 
ZooKeeper client to use
+        * @param configuration                  {@link Configuration} object
+        * @param jobId                          ID of job to create the 
instance for
+        * @param maxNumberOfCheckpointsToRetain The maximum number of 
checkpoints to retain
+        * @param userClassLoader                User code class loader
+        * @return {@link ZooKeeperCompletedCheckpointStore} instance
+        */
+       public static CompletedCheckpointStore createCompletedCheckpoints(
+                       CuratorFramework client,
+                       Configuration configuration,
+                       JobID jobId,
+                       int maxNumberOfCheckpointsToRetain,
+                       ClassLoader userClassLoader) throws Exception {
+
+               checkNotNull(configuration, "Configuration");
+
+               StateHandleProvider<CompletedCheckpoint> stateHandleProvider =
+                               
StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration);
+
+               String completedCheckpointsPath = configuration.getString(
+                               ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
+                               
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
+
+               completedCheckpointsPath += 
ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
+
+               return new ZooKeeperCompletedCheckpointStore(
+                               maxNumberOfCheckpointsToRetain,
+                               userClassLoader,
+                               client,
+                               completedCheckpointsPath,
+                               stateHandleProvider);
+       }
+
+       /**
+        * Creates a {@link ZooKeeperCheckpointIDCounter} instance.
+        *
+        * @param client        The {@link CuratorFramework} ZooKeeper client 
to use
+        * @param configuration {@link Configuration} object
+        * @param jobId         ID of job to create the instance for
+        * @return {@link ZooKeeperCheckpointIDCounter} instance
+        */
+       public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter(
+                       CuratorFramework client,
+                       Configuration configuration,
+                       JobID jobId) throws Exception {
+
+               String checkpointIdCounterPath = configuration.getString(
+                               
ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
+                               
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
+
+               checkpointIdCounterPath += 
ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
+
+               return new ZooKeeperCheckpointIDCounter(client, 
checkpointIdCounterPath);
+       }
+
+       /**
         * Private constructor to prevent instantiation.
         */
        private ZooKeeperUtils() {

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
new file mode 100644
index 0000000..936fe1b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State handles backed by ZooKeeper.
+ *
+ * <p>Added state is persisted via {@link StateHandle}s, which in turn are 
written to
+ * ZooKeeper. This level of indirection is necessary to keep the amount of 
data in ZooKeeper
+ * small. ZooKeeper is build for data in the KB range whereas state can grow 
to multiple MBs.
+ *
+ * <p>State modifications require some care, because it is possible that 
certain failures bring
+ * the state handle backend and ZooKeeper out of sync.
+ *
+ * <p>ZooKeeper holds the ground truth about state handles, i.e. the following 
holds:
+ *
+ * <pre>
+ * State handle in ZooKeeper => State handle exists
+ * </pre>
+ *
+ * But not:
+ *
+ * <pre>
+ * State handle exists => State handle in ZooKeeper
+ * </pre>
+ *
+ * There can be lingering state handles when failures happen during operation. 
They
+ * need to be cleaned up manually (see <a 
href="https://issues.apache.org/jira/browse/FLINK-2513";>
+ * FLINK-2513</a> about a possible way to overcome this).
+ *
+ * @param <T> Type of state
+ */
+public class ZooKeeperStateHandleStore<T extends Serializable> {
+
+       /** Curator ZooKeeper client */
+       private final CuratorFramework client;
+
+       /** State handle provider */
+       private final StateHandleProvider<T> stateHandleProvider;
+
+       /**
+        * Creates a {@link ZooKeeperStateHandleStore}.
+        *
+        * @param client              The Curator ZooKeeper client. 
<strong>Important:</strong> It is
+        *                            expected that the client's namespace 
ensures that the root
+        *                            path is exclusive for all state handles 
managed by this
+        *                            instance, e.g. 
<code>client.usingNamespace("/stateHandles")</code>
+        * @param stateHandleProvider The state handle provider for the state
+        */
+       public ZooKeeperStateHandleStore(
+                       CuratorFramework client,
+                       StateHandleProvider<T> stateHandleProvider) {
+
+               this.client = checkNotNull(client, "Curator client");
+               this.stateHandleProvider = checkNotNull(stateHandleProvider, 
"State handle provider");
+       }
+
+       /**
+        * Creates a state handle and stores it in ZooKeeper with create mode 
{@link
+        * CreateMode#PERSISTENT}.
+        *
+        * @see #add(String, Serializable, CreateMode)
+        */
+       public StateHandle<T> add(String pathInZooKeeper, T state) throws 
Exception {
+               return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+       }
+
+       /**
+        * Creates a state handle and stores it in ZooKeeper.
+        *
+        * <p><strong>Important</strong>: This will <em>not</em> store the 
actual state in
+        * ZooKeeper, but create a state handle and store it in ZooKeeper. This 
level of indirection
+        * makes sure that data in ZooKeeper is small.
+        *
+        * @param pathInZooKeeper Destination path in ZooKeeper (expected to 
*not* exist yet and
+        *                        start with a '/')
+        * @param state           State to be added
+        * @param createMode      The create mode for the new path in ZooKeeper
+        * @return Created {@link StateHandle}
+        * @throws Exception If a ZooKeeper or state handle operation fails
+        */
+       public StateHandle<T> add(String pathInZooKeeper, T state, CreateMode 
createMode) throws Exception {
+               checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+               checkNotNull(state, "State");
+
+               // Create the state handle. Nothing persisted yet.
+               StateHandle<T> stateHandle = 
stateHandleProvider.createStateHandle(state);
+
+               boolean success = false;
+
+               try {
+                       // Serialize the state handle. This writes the state to 
the backend.
+                       byte[] serializedStateHandle = 
InstantiationUtil.serializeObject(stateHandle);
+
+                       // Write state handle (not the actual state) to 
ZooKeeper. This is expected to be
+                       // smaller than the state itself. This level of 
indirection makes sure that data in
+                       // ZooKeeper is small, because ZooKeeper is designed 
for data in the KB range, but
+                       // the state can be larger.
+                       
client.create().withMode(createMode).forPath(pathInZooKeeper, 
serializedStateHandle);
+
+                       success = true;
+
+                       return stateHandle;
+               }
+               finally {
+                       if (!success) {
+                               // Cleanup the state handle if it was not 
written to ZooKeeper.
+                               if (stateHandle != null) {
+                                       stateHandle.discardState();
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Replaces a state handle in ZooKeeper and discards the old state 
handle.
+        *
+        * @param pathInZooKeeper Destination path in ZooKeeper (expected to 
exist and start with a '/')
+        * @param expectedVersion Expected version of the node to replace
+        * @param state           The new state to replace the old one
+        * @throws Exception If a ZooKeeper or state handle operation fails
+        */
+       public void replace(String pathInZooKeeper, int expectedVersion, T 
state) throws Exception {
+               checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+               checkNotNull(state, "State");
+
+               StateHandle<T> oldStateHandle = get(pathInZooKeeper);
+
+               StateHandle<T> stateHandle = 
stateHandleProvider.createStateHandle(state);
+
+               boolean success = false;
+
+               try {
+                       // Serialize the new state handle. This writes the 
state to the backend.
+                       byte[] serializedStateHandle = 
InstantiationUtil.serializeObject(stateHandle);
+
+                       // Replace state handle in ZooKeeper.
+                       client.setData()
+                                       .withVersion(expectedVersion)
+                                       .forPath(pathInZooKeeper, 
serializedStateHandle);
+
+                       success = true;
+               }
+               finally {
+                       if (success) {
+                               oldStateHandle.discardState();
+                       }
+                       else {
+                               stateHandle.discardState();
+                       }
+               }
+       }
+
+       /**
+        * Returns the version of the node if it exists or <code>-1</code> if 
it doesn't.
+        *
+        * @param pathInZooKeeper Path in ZooKeeper to check
+        * @return Version of the ZNode if the path exists, <code>-1</code> 
otherwise.
+        * @throws Exception If the ZooKeeper operation fails
+        */
+       public int exists(String pathInZooKeeper) throws Exception {
+               checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+               Stat stat = client.checkExists().forPath(pathInZooKeeper);
+
+               if (stat != null) {
+                       return stat.getVersion();
+               }
+
+               return -1;
+       }
+
+       /**
+        * Gets a state handle from ZooKeeper.
+        *
+        * @param pathInZooKeeper Path in ZooKeeper to get the state handle 
from (expected to
+        *                        exist and start with a '/').
+        * @return The state handle
+        * @throws Exception If a ZooKeeper or state handle operation fails
+        */
+       @SuppressWarnings("unchecked")
+       public StateHandle<T> get(String pathInZooKeeper) throws Exception {
+               checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+               byte[] data = client.getData().forPath(pathInZooKeeper);
+
+               return (StateHandle<T>) InstantiationUtil
+                               .deserializeObject(data, 
ClassLoader.getSystemClassLoader());
+       }
+
+       /**
+        * Gets all available state handles from ZooKeeper.
+        *
+        * <p>If there is a concurrent modification, the operation is retried 
until it succeeds.
+        *
+        * @return All state handles from ZooKeeper.
+        * @throws Exception If a ZooKeeper or state handle operation fails
+        */
+       @SuppressWarnings("unchecked")
+       public List<Tuple2<StateHandle<T>, String>> getAll() throws Exception {
+               final List<Tuple2<StateHandle<T>, String>> stateHandles = new 
ArrayList<>();
+
+               boolean success = false;
+
+               retry:
+               while (!success) {
+                       // Initial cVersion (number of changes to the children 
of this node)
+                       int initialCVersion = 
client.checkExists().forPath("/").getCversion();
+
+                       List<String> children = 
client.getChildren().forPath("/");
+
+                       for (String path : children) {
+                               path = "/" + path;
+
+                               try {
+                                       final StateHandle<T> stateHandle = 
get(path);
+                                       stateHandles.add(new 
Tuple2<>(stateHandle, path));
+                               }
+                               catch (KeeperException.NoNodeException ignored) 
{
+                                       // Concurrent deletion, retry
+                                       continue retry;
+                               }
+                       }
+
+                       int finalCVersion = 
client.checkExists().forPath("/").getCversion();
+
+                       // Check for concurrent modifications
+                       success = initialCVersion == finalCVersion;
+               }
+
+               return stateHandles;
+       }
+
+       /**
+        * Gets all available state handles from ZooKeeper sorted by name 
(ascending).
+        *
+        * <p>If there is a concurrent modification, the operation is retried 
until it succeeds.
+        *
+        * @return All state handles in ZooKeeper.
+        * @throws Exception If a ZooKeeper or state handle operation fails
+        */
+       @SuppressWarnings("unchecked")
+       public List<Tuple2<StateHandle<T>, String>> getAllSortedByName() throws 
Exception {
+               final List<Tuple2<StateHandle<T>, String>> stateHandles = new 
ArrayList<>();
+
+               boolean success = false;
+
+               retry:
+               while (!success) {
+                       // Initial cVersion (number of changes to the children 
of this node)
+                       int initialCVersion = 
client.checkExists().forPath("/").getCversion();
+
+                       List<String> children = ZKPaths.getSortedChildren(
+                                       
client.getZookeeperClient().getZooKeeper(),
+                                       
ZKPaths.fixForNamespace(client.getNamespace(), "/"));
+
+                       for (String path : children) {
+                               path = "/" + path;
+
+                               try {
+                                       final StateHandle<T> stateHandle = 
get(path);
+                                       stateHandles.add(new 
Tuple2<>(stateHandle, path));
+                               }
+                               catch (KeeperException.NoNodeException ignored) 
{
+                                       // Concurrent deletion, retry
+                                       continue retry;
+                               }
+                       }
+
+                       int finalCVersion = 
client.checkExists().forPath("/").getCversion();
+
+                       // Check for concurrent modifications
+                       success = initialCVersion == finalCVersion;
+               }
+
+               return stateHandles;
+       }
+
+       /**
+        * Removes a state handle from ZooKeeper.
+        *
+        * <p><stong>Important</stong>: this does not discard the state handle. 
If you want to
+        * discard the state handle call {@link #removeAndDiscardState(String)}.
+        *
+        * @param pathInZooKeeper Path of state handle to remove (expected to 
start with a '/')
+        * @throws Exception If the ZooKeeper operation fails
+        */
+       public void remove(String pathInZooKeeper) throws Exception {
+               checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+               
client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
+       }
+
+       /**
+        * Removes a state handle from ZooKeeper asynchronously.
+        *
+        * <p><stong>Important</stong>: this does not discard the state handle. 
If you want to
+        * discard the state handle call {@link #removeAndDiscardState(String)}.
+        *
+        * @param pathInZooKeeper Path of state handle to remove (expected to 
start with a '/')
+        * @param callback        The callback after the operation finishes
+        * @throws Exception If the ZooKeeper operation fails
+        */
+       public void remove(String pathInZooKeeper, BackgroundCallback callback) 
throws Exception {
+               checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+               checkNotNull(callback, "Background callback");
+
+               
client.delete().deletingChildrenIfNeeded().inBackground(callback).forPath(pathInZooKeeper);
+       }
+
+       /**
+        * Discards a state handle and removes it from ZooKeeper.
+        *
+        * <p>If you only want to remove the state handle in ZooKeeper call 
{@link #remove(String)}.
+        *
+        * @param pathInZooKeeper Path of state handle to discard (expected to 
start with a '/')
+        * @throws Exception If the ZooKeeper or state handle operation fails
+        */
+       public void removeAndDiscardState(String pathInZooKeeper) throws 
Exception {
+               checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+               StateHandle<T> stateHandle = get(pathInZooKeeper);
+
+               // Delete the state handle from ZooKeeper first
+               
client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
+
+               // Discard the state handle only after it has been successfully 
deleted from ZooKeeper.
+               // Otherwise we might enter an illegal state after failures 
(with a state handle in
+               // ZooKeeper, which has already been discarded).
+               stateHandle.discardState();
+       }
+
+       /**
+        * Discards all available state handles and removes them from ZooKeeper.
+        *
+        * @throws Exception If a ZooKeeper or state handle operation fails
+        */
+       public void removeAndDiscardAllState() throws Exception {
+               final List<Tuple2<StateHandle<T>, String>> allStateHandles = 
getAll();
+
+               ZKPaths.deleteChildren(
+                               client.getZookeeperClient().getZooKeeper(),
+                               ZKPaths.fixForNamespace(client.getNamespace(), 
"/"),
+                               false);
+
+               // Discard the state handles only after they have been 
successfully deleted from ZooKeeper.
+               for (Tuple2<StateHandle<T>, String> stateHandleAndPath : 
allStateHandles) {
+                       stateHandleAndPath.f0.discardState();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
index 75ad20f..67d7a06 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.jobmanager
 
 import akka.actor.ActorRef
-
+import org.apache.flink.runtime.akka.ListeningBehaviour
 
 /**
  * Utility class to store job information on the [[JobManager]]. The JobInfo 
stores which actor
@@ -27,11 +27,20 @@ import akka.actor.ActorRef
  * Additionally, it stores whether the job was started in the detached mode. 
Detached means that
  * the submitting actor does not wait for the job result once the job has 
terminated.
  *
+ * Important: This class is serializable, but needs to be deserialized in the 
context of an actor
+ * system in order to resolve the client [[ActorRef]]. It is possible to 
serialize the Akka URL
+ * manually, but it is cumbersome and complicates testing in certain 
scenarios, where you need to
+ * make sure to resolve the correct [[ActorRef]]s when submitting jobs 
(RepointableActorRef vs.
+ * RemoteActorRef).
+ *
  * @param client Actor which submitted the job
  * @param start Starting time
  */
-class JobInfo(val client: ActorRef, val start: Long,
-              val sessionTimeout: Long) {
+class JobInfo(
+  val client: ActorRef,
+  val listeningBehaviour: ListeningBehaviour,
+  val start: Long,
+  val sessionTimeout: Long) extends Serializable {
 
   var sessionAlive = sessionTimeout > 0
 
@@ -49,12 +58,16 @@ class JobInfo(val client: ActorRef, val start: Long,
     }
   }
 
+  override def toString = s"JobInfo(client: $client ($listeningBehaviour), 
start: $start)"
+
   def setLastActive() =
     lastActive = System.currentTimeMillis()
 }
 
 object JobInfo{
-  def apply(client: ActorRef, start: Long,
-            sessionTimeout: Long) =
-    new JobInfo(client, start, sessionTimeout)
+  def apply(
+    client: ActorRef,
+    listeningBehaviour: ListeningBehaviour,
+    start: Long,
+    sessionTimeout: Long) = new JobInfo(client, listeningBehaviour, start, 
sessionTimeout)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 95637bb..f3e4054 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,31 +19,39 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.io.{File, IOException}
-import java.lang.reflect.{InvocationTargetException, Constructor}
+import java.lang.reflect.{Constructor, InvocationTargetException}
 import java.net.InetSocketAddress
 import java.util.UUID
 
 import akka.actor.Status.Failure
-import akka.actor.{Props, Terminated, PoisonPill, ActorRef, ActorSystem}
+import akka.actor._
 import akka.pattern.ask
-
 import grizzled.slf4j.Logger
-
 import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
+import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
+import org.apache.flink.runtime.checkpoint.{CheckpointRecoveryFactory, 
StandaloneCheckpointRecoveryFactory, ZooKeeperCheckpointRecoveryFactory}
 import org.apache.flink.runtime.client._
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, 
ExecutionJobVertex}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
-import org.apache.flink.runtime.leaderelection.{LeaderContender, 
LeaderElectionService}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
+import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
+import org.apache.flink.runtime.leaderelection.{LeaderContender, 
LeaderElectionService, StandaloneLeaderElectionService}
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages._
+import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, 
SendStackTrace}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, 
UpdateTaskExecutionState}
-import org.apache.flink.runtime.messages.accumulators._
+import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, 
AccumulatorResultStringsFound, AccumulatorResultsErroneous, 
AccumulatorResultsFound, RequestAccumulatorResults, 
RequestAccumulatorResultsStringified}
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
AcknowledgeCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.process.ProcessReaper
@@ -51,25 +59,16 @@ import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
-import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor}
-import org.apache.flink.runtime.{FlinkActor, StreamingMode, 
LeaderSessionMessageFilter}
-import org.apache.flink.runtime.LogMessages
-import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils}
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
-import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus}
-import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
-import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, 
Heartbeat}
-import org.apache.flink.util.{NetUtils, SerializedValue, ExceptionUtils, 
InstantiationUtil}
+import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages, StreamingMode}
+import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils}
 
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
-import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext.Implicits.global
 
 
 /**
@@ -110,17 +109,22 @@ class JobManager(
     protected val delayBetweenRetries: Long,
     protected val timeout: FiniteDuration,
     protected val mode: StreamingMode,
-    protected val leaderElectionService: LeaderElectionService)
+    protected val leaderElectionService: LeaderElectionService,
+    protected val submittedJobGraphs : SubmittedJobGraphStore,
+    protected val checkpointRecoveryFactory : CheckpointRecoveryFactory)
   extends FlinkActor 
   with LeaderSessionMessageFilter // mixin oder is important, we want 
filtering after logging
   with LogMessages // mixin order is important, we want first logging
-  with LeaderContender {
+  with LeaderContender
+  with SubmittedJobGraphListener {
 
   override val log = Logger(getClass)
 
   /** Either running or not yet archived jobs (session hasn't been ended). */
   protected val currentJobs = scala.collection.mutable.HashMap[JobID, 
(ExecutionGraph, JobInfo)]()
 
+  protected val recoveryMode = RecoveryMode.fromConfig(flinkConfiguration)
+
   var leaderSessionID: Option[UUID] = None
 
   /**
@@ -138,6 +142,22 @@ class JobManager(
           "start.", e)
         throw new RuntimeException("Could not start the leader election 
service.", e)
     }
+
+    try {
+      submittedJobGraphs.start(this)
+    } catch {
+      case e: Exception =>
+        log.error("Could not start the submitted job graphs service.", e)
+        throw new RuntimeException("Could not start the submitted job graphs 
service.", e)
+    }
+
+    try {
+      checkpointRecoveryFactory.start()
+    } catch {
+      case e: Exception =>
+        log.error("Could not start the checkpoint recovery service.", e)
+        throw new RuntimeException("Could not start the checkpoint recovery 
service.", e)
+    }
   }
 
   override def postStop(): Unit = {
@@ -159,6 +179,18 @@ class JobManager(
       case e: Exception => log.error("Could not properly shutdown the leader 
election service.")
     }
 
+    try {
+      submittedJobGraphs.stop()
+    } catch {
+      case e: Exception => log.error("Could not properly stop the submitted 
job graphs service.")
+    }
+
+    try {
+      checkpointRecoveryFactory.stop()
+    } catch {
+      case e: Exception => log.error("Could not properly stop the checkpoint 
recovery service.")
+    }
+
     if (archive != ActorRef.noSender) {
       archive ! decorateMessage(PoisonPill)
     }
@@ -191,12 +223,21 @@ class JobManager(
       // confirming the leader session ID might be blocking, thus do it in a 
future
       future{
         leaderElectionService.confirmLeaderSessionID(newLeaderSessionID.orNull)
+
+        // TODO (critical next step) This needs to be more flexible and robust 
(e.g. wait for task
+        // managers etc.)
+        if (recoveryMode != RecoveryMode.STANDALONE) {
+          context.system.scheduler.scheduleOnce(new 
FiniteDuration(delayBetweenRetries,
+            MILLISECONDS), self, 
decorateMessage(RecoverAllJobs))(context.dispatcher)
+        }
       }(context.dispatcher)
 
     case RevokeLeadership =>
       log.info(s"JobManager ${self.path.toSerializationFormat} was revoked 
leadership.")
 
-      cancelAndClearEverything(new Exception("JobManager is no longer the 
leader."))
+      future {
+        cancelAndClearEverything(new Exception("JobManager is no longer the 
leader."))
+      }(context.dispatcher)
 
       // disconnect the registered task managers
       instanceManager.getAllRegisteredInstances.asScala.foreach {
@@ -269,7 +310,62 @@ class JobManager(
       sender ! decorateMessage(instanceManager.getTotalNumberOfSlots)
 
     case SubmitJob(jobGraph, listeningBehaviour) =>
-      submitJob(jobGraph, listeningBehaviour)
+      val client = sender()
+
+      val jobInfo = new JobInfo(client, listeningBehaviour, 
System.currentTimeMillis(),
+        jobGraph.getSessionTimeout)
+
+      future {
+        submitJob(jobGraph, jobInfo)
+      }(context.dispatcher)
+
+    case RecoverJob(jobId) =>
+      future {
+        // The ActorRef, which is part of the submitted job graph can only be 
deserialized in the
+        // scope of an actor system.
+        akka.serialization.JavaSerializer.currentSystem.withValue(
+          context.system.asInstanceOf[ExtendedActorSystem]) {
+
+          log.info(s"Attempting to recover job $jobId.")
+
+          val jobGraph = submittedJobGraphs.recoverJobGraph(jobId)
+
+          if (jobGraph.isDefined) {
+            if (!leaderElectionService.hasLeadership()) {
+              // we've lost leadership. mission: abort.
+              log.warn(s"Lost leadership during recovery. Aborting recovery of 
$jobId.")
+            }
+            else {
+              recoverJobGraph(jobGraph.get)
+            }
+          }
+          else {
+            log.warn(s"Failed to recover job graph ${jobId}.")
+          }
+        }
+      }(context.dispatcher)
+
+    case RecoverAllJobs =>
+      future {
+        // The ActorRef, which is part of the submitted job graph can only be 
deserialized in the
+        // scope of an actor system.
+        akka.serialization.JavaSerializer.currentSystem.withValue(
+          context.system.asInstanceOf[ExtendedActorSystem]) {
+
+          val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala
+
+          if (!leaderElectionService.hasLeadership()) {
+            // we've lost leadership. mission: abort.
+            log.warn(s"Lost leadership during recovery. Aborting recovery of 
${jobGraphs.size} " +
+              s"jobs.")
+          }
+          else {
+            log.debug(s"Attempting to recover ${jobGraphs.size} job graphs.")
+
+            jobGraphs.foreach(recoverJobGraph(_))
+          }
+        }
+      }(context.dispatcher)
 
     case CancelJob(jobID) =>
       log.info(s"Trying to cancel job with ID $jobID.")
@@ -377,10 +473,27 @@ class JobManager(
           if (newJobStatus.isTerminalState()) {
             jobInfo.end = timeStamp
 
-            // is the client waiting for the job result?
-            if (jobInfo.client != ActorRef.noSender) {
-              newJobStatus match {
-                case JobStatus.FINISHED =>
+            future {
+              // TODO If removing the JobGraph from the 
SubmittedJobGraphsStore fails, the job will
+              // linger around and potentially be recovered at a later time. 
There is nothing we
+              // can do about that, but it should be communicated with the 
Client.
+              if (jobInfo.sessionAlive) {
+                jobInfo.setLastActive()
+                val lastActivity = jobInfo.lastActive
+                context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout 
seconds) {
+                  // remove only if no activity occurred in the meantime
+                  if (lastActivity == jobInfo.lastActive) {
+                    removeJob(jobID)
+                  }
+                }
+              } else {
+                removeJob(jobID)
+              }
+
+              // is the client waiting for the job result?
+              if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
+                newJobStatus match {
+                  case JobStatus.FINISHED =>
                   try {
                     val accumulatorResults = 
executionGraph.getAccumulatorsSerialized()
                     val result = new SerializedJobExecutionResult(
@@ -398,47 +511,37 @@ class JobManager(
                       jobInfo.client ! decorateMessage(JobResultFailure(
                         new SerializedThrowable(exception)))
                   }
-                case JobStatus.CANCELED =>
-                  // the error may be packed as a serialized throwable
-                  val unpackedError = SerializedThrowable.get(
-                    error, executionGraph.getUserClassLoader())
-
-                  jobInfo.client ! decorateMessage(JobResultFailure(
-                    new SerializedThrowable(
-                      new JobCancellationException(jobID, "Job was 
cancelled.", unpackedError))))
-
-                case JobStatus.FAILED =>
-                  val unpackedError = SerializedThrowable.get(
-                    error, executionGraph.getUserClassLoader())
-
-                  jobInfo.client ! decorateMessage(JobResultFailure(
-                    new SerializedThrowable(
-                      new JobExecutionException(jobID, "Job execution 
failed.", unpackedError))))
-
-                case x =>
-                  val exception = new JobExecutionException(jobID, s"$x is not 
a terminal state.")
-                  jobInfo.client ! decorateMessage(JobResultFailure(
-                    new SerializedThrowable(exception)))
-                  throw exception
-              }
-            }
 
-            if (jobInfo.sessionAlive) {
-              jobInfo.setLastActive()
-              val lastActivity = jobInfo.lastActive
-              context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout 
seconds) {
-                // remove only if no activity occurred in the meantime
-                if (lastActivity == jobInfo.lastActive) {
-                  removeJob(jobID)
+                  case JobStatus.CANCELED =>
+                    // the error may be packed as a serialized throwable
+                    val unpackedError = SerializedThrowable.get(
+                      error, executionGraph.getUserClassLoader())
+
+                    jobInfo.client ! decorateMessage(JobResultFailure(
+                      new SerializedThrowable(
+                        new JobCancellationException(jobID, "Job was 
cancelled.", unpackedError))))
+
+                  case JobStatus.FAILED =>
+                    val unpackedError = SerializedThrowable.get(
+                      error, executionGraph.getUserClassLoader())
+
+                    jobInfo.client ! decorateMessage(JobResultFailure(
+                      new SerializedThrowable(
+                        new JobExecutionException(jobID, "Job execution 
failed.", unpackedError))))
+
+                  case x =>
+                    val exception = new JobExecutionException(jobID, s"$x is 
not a terminal state.")
+                    jobInfo.client ! decorateMessage(JobResultFailure(
+                      new SerializedThrowable(exception)))
+                    throw exception
                 }
               }
-            } else {
-              removeJob(jobID)
-            }
-
+            }(context.dispatcher)
           }
         case None =>
-          removeJob(jobID)
+          future {
+            removeJob(jobID)
+          }(context.dispatcher)
       }
 
     case ScheduleOrUpdateConsumers(jobId, partitionId) =>
@@ -600,11 +703,12 @@ class JobManager(
    * graph and the execution vertices are queued for scheduling.
    *
    * @param jobGraph representing the Flink job
-   * @param listeningBehaviour specifies the listening behaviour of the sender.
+   * @param jobInfo the job info
+   * @param isRecovery Flag indicating whether this is a recovery or initial 
submission
    */
-  private def submitJob(jobGraph: JobGraph, listeningBehaviour: 
ListeningBehaviour): Unit = {
+  private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: 
Boolean = false): Unit = {
     if (jobGraph == null) {
-      sender() ! decorateMessage(JobResultFailure(
+      jobInfo.client ! decorateMessage(JobResultFailure(
         new SerializedThrowable(
           new JobSubmissionException(null, "JobGraph must not be null.")
         )
@@ -615,7 +719,7 @@ class JobManager(
       val jobName = jobGraph.getName
       var executionGraph: ExecutionGraph = null
 
-      log.info(s"Received job ${jobId} (${jobName}).")
+      log.info(s"Submitting job $jobId ($jobName)" + (if (isRecovery) " 
(Recovery)" else "") + ".")
 
       try {
         // Important: We need to make sure that the library registration is 
the first action,
@@ -628,7 +732,7 @@ class JobManager(
         catch {
           case t: Throwable =>
             throw new JobSubmissionException(jobId,
-            "Cannot set up the user code libraries: " + t.getMessage, t)
+              "Cannot set up the user code libraries: " + t.getMessage, t)
         }
 
         val userCodeLoader = 
libraryCacheManager.getClassLoader(jobGraph.getJobID)
@@ -641,18 +745,10 @@ class JobManager(
           throw new JobSubmissionException(jobId, "The given job is empty")
         }
 
-        val client = if(listeningBehaviour == ListeningBehaviour.DETACHED) {
-          // The client does not want to receive the 
SerializedJobExecutionResult
-          ActorRef.noSender
-        } else {
-          // Send the job execution result back to the sender
-          sender
-        }
-
         // see if there already exists an ExecutionGraph for the corresponding 
job ID
         executionGraph = currentJobs.get(jobGraph.getJobID) match {
-          case Some((graph, jobInfo)) =>
-            jobInfo.setLastActive()
+          case Some((graph, currentJobInfo)) =>
+            currentJobInfo.setLastActive()
             graph
           case None =>
             val graph = new ExecutionGraph(
@@ -664,11 +760,7 @@ class JobManager(
               jobGraph.getUserJarBlobKeys,
               jobGraph.getClasspaths,
               userCodeLoader)
-            val jobInfo = JobInfo(
-              client,
-              System.currentTimeMillis(),
-              jobGraph.getSessionTimeout)
-            currentJobs.put(jobGraph.getJobID, (graph, jobInfo))
+
             graph
         }
 
@@ -682,7 +774,7 @@ class JobManager(
         executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
         executionGraph.setScheduleMode(jobGraph.getScheduleMode())
         
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling())
-        
+
         try {
           executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph))
         }
@@ -691,7 +783,7 @@ class JobManager(
             log.warn("Cannot create JSON plan for job", t)
             executionGraph.setJsonPlan("{}")
         }
-        
+
         // initialize the vertices that have a master initialization hook
         // file output formats create directories here, input formats create 
splits
         if (log.isDebugEnabled) {
@@ -701,62 +793,67 @@ class JobManager(
         val numSlots = scheduler.getTotalNumberOfSlots()
 
         for (vertex <- jobGraph.getVertices.asScala) {
-
           val executableClass = vertex.getInvokableClassName
           if (executableClass == null || executableClass.length == 0) {
             throw new JobSubmissionException(jobId,
               s"The vertex ${vertex.getID} (${vertex.getName}) has no 
invokable class.")
           }
 
-              if (vertex.getParallelism() == 
ExecutionConfig.PARALLELISM_AUTO_MAX) {
-                vertex.setParallelism(numSlots)
-              }
+          if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) 
{
+            vertex.setParallelism(numSlots)
+          }
 
-              try {
-                vertex.initializeOnMaster(userCodeLoader)
-              }
-              catch {
+          try {
+            vertex.initializeOnMaster(userCodeLoader)
+          }
+          catch {
             case t: Throwable =>
               throw new JobExecutionException(jobId,
                 "Cannot initialize task '" + vertex.getName() + "': " + 
t.getMessage, t)
-              }
-            }
+          }
+        }
 
-            // topologically sort the job vertices and attach the graph to the 
existing one
-            val sortedTopology = 
jobGraph.getVerticesSortedTopologicallyFromSources()
-            if (log.isDebugEnabled) {
-              log.debug(s"Adding ${sortedTopology.size()} vertices from " +
-                s"job graph ${jobId} (${jobName}).")
-            }
-            executionGraph.attachJobGraph(sortedTopology)
+        // topologically sort the job vertices and attach the graph to the 
existing one
+        val sortedTopology = 
jobGraph.getVerticesSortedTopologicallyFromSources()
+        if (log.isDebugEnabled) {
+          log.debug(s"Adding ${sortedTopology.size()} vertices from " +
+            s"job graph ${jobId} (${jobName}).")
+        }
+        executionGraph.attachJobGraph(sortedTopology)
 
-            if (log.isDebugEnabled) {
-              log.debug("Successfully created execution graph from job " +
-                s"graph ${jobId} (${jobName}).")
-            }
+        if (log.isDebugEnabled) {
+          log.debug("Successfully created execution graph from job " +
+            s"graph ${jobId} (${jobName}).")
+        }
 
-            // configure the state checkpointing
-            val snapshotSettings = jobGraph.getSnapshotSettings
-            if (snapshotSettings != null) {
+        // configure the state checkpointing
+        val snapshotSettings = jobGraph.getSnapshotSettings
+        if (snapshotSettings != null) {
+          val jobId = jobGraph.getJobID()
 
-              val idToVertex: JobVertexID => ExecutionJobVertex = id => {
-                val vertex = executionGraph.getJobVertex(id)
-                if (vertex == null) {
-                  throw new JobSubmissionException(jobId,
-                    "The snapshot checkpointing settings refer to non-existent 
vertex " + id)
-                }
-                vertex
-              }
+          val idToVertex: JobVertexID => ExecutionJobVertex = id => {
+            val vertex = executionGraph.getJobVertex(id)
+            if (vertex == null) {
+              throw new JobSubmissionException(jobId,
+                "The snapshot checkpointing settings refer to non-existent 
vertex " + id)
+            }
+            vertex
+          }
 
-              val triggerVertices: java.util.List[ExecutionJobVertex] =
+          val triggerVertices: java.util.List[ExecutionJobVertex] =
             
snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava
 
-              val ackVertices: java.util.List[ExecutionJobVertex] =
+          val ackVertices: java.util.List[ExecutionJobVertex] =
             
snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava
 
-              val confirmVertices: java.util.List[ExecutionJobVertex] =
+          val confirmVertices: java.util.List[ExecutionJobVertex] =
             
snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava
 
+          val completedCheckpoints = checkpointRecoveryFactory
+            .createCompletedCheckpoints(jobId, userCodeLoader)
+
+          val checkpointIdCounter = 
checkpointRecoveryFactory.createCheckpointIDCounter(jobId)
+
           executionGraph.enableSnapshotCheckpointing(
             snapshotSettings.getCheckpointInterval,
             snapshotSettings.getCheckpointTimeout,
@@ -764,23 +861,39 @@ class JobManager(
             ackVertices,
             confirmVertices,
             context.system,
-            leaderSessionID.orNull)
+            leaderSessionID.orNull,
+            checkpointIdCounter,
+            completedCheckpoints,
+            recoveryMode)
         }
 
         // get notified about job status changes
         executionGraph.registerJobStatusListener(
           new AkkaActorGateway(self, leaderSessionID.orNull))
 
-        if (listeningBehaviour == 
ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
+        if (jobInfo.listeningBehaviour == 
ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
           // the sender wants to be notified about state changes
-          val gateway = new AkkaActorGateway(sender(), leaderSessionID.orNull)
+          val gateway = new AkkaActorGateway(jobInfo.client, 
leaderSessionID.orNull)
 
           executionGraph.registerExecutionListener(gateway)
           executionGraph.registerJobStatusListener(gateway)
         }
 
+        if (isRecovery) {
+          executionGraph.restoreLatestCheckpointedState()
+        }
+        else {
+          submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, 
jobInfo))
+        }
+
+        // Add the job graph only after everything is finished. Otherwise 
there can be races in
+        // tests, which check the currentJobs (for example before killing a 
JM).
+        if (!currentJobs.contains(jobId)) {
+          currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo))
+        }
+
         // done with submitting the job
-        sender() ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
+        jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
       }
       catch {
         case t: Throwable =>
@@ -799,33 +912,61 @@ class JobManager(
             new JobExecutionException(jobId, s"Failed to submit job ${jobId} 
(${jobName})", t)
           }
 
-          sender() ! decorateMessage(JobResultFailure(new 
SerializedThrowable(rt)))
+          jobInfo.client ! decorateMessage(JobResultFailure(new 
SerializedThrowable(rt)))
           return
       }
 
-      // NOTE: Scheduling the job for execution is a separate action from the 
job submission.
-      // The success of submitting the job must be independent from the 
success of scheduling
-      // the job.
-      try {
-        log.info(s"Scheduling job ${executionGraph.getJobName}.")
-        executionGraph.scheduleForExecution(scheduler)
-      }
-      catch {
-        case t: Throwable => try {
-          executionGraph.fail(t)
+      if (leaderElectionService.hasLeadership) {
+        // There is a small chance that multiple job managers schedule the 
same job after if they
+        // try to recover at the same time. This will eventually be noticed, 
but can not be ruled
+        // out from the beginning.
+
+        // NOTE: Scheduling the job for execution is a separate action from 
the job submission.
+        // The success of submitting the job must be independent from the 
success of scheduling
+        // the job.
+        try {
+          log.info(s"Scheduling job $jobId ($jobName).")
+
+          executionGraph.scheduleForExecution(scheduler)
         }
         catch {
-          case tt: Throwable => {
-            log.error("Error while marking ExecutionGraph as failed.", tt)
+          case t: Throwable => try {
+            executionGraph.fail(t)
+          }
+          catch {
+            case tt: Throwable => {
+              log.error("Error while marking ExecutionGraph as failed.", tt)
+            }
           }
         }
       }
+      else {
+        // Remove the job graph. Otherwise it will be lingering around and 
possibly removed from
+        // ZooKeeper by this JM.
+        currentJobs.remove(jobId)
+
+        log.warn(s"Submitted job $jobId, but not leader. The other leader 
needs to recover " +
+          "this. I am not scheduling the job for execution.")
+      }
+    }
+  }
+
+  /**
+   * Submits the job if it is not already one of our current jobs.
+   *
+   * @param jobGraph Job to recover
+   */
+  private def recoverJobGraph(jobGraph: SubmittedJobGraph): Unit = {
+    if (!currentJobs.contains(jobGraph.getJobId)) {
+      future {
+        submitJob(jobGraph.getJobGraph(), jobGraph.getJobInfo(), isRecovery = 
true)
+      }(context.dispatcher)
     }
   }
 
   /**
    * Dedicated handler for checkpoint messages.
-   * 
+   *
    * @param actorMessage The checkpoint actor message.
    */
   private def handleCheckpointMessage(actorMessage: 
AbstractCheckpointMessage): Unit = {
@@ -836,13 +977,15 @@ class JobManager(
           case Some((graph, _)) =>
             val coordinator = graph.getCheckpointCoordinator()
             if (coordinator != null) {
-              try {
-                coordinator.receiveAcknowledgeMessage(ackMessage)
-              }
-              catch {
-                case t: Throwable =>
-                  log.error(s"Error in CheckpointCoordinator while processing 
$ackMessage", t)
-              }
+              future {
+                try {
+                  coordinator.receiveAcknowledgeMessage(ackMessage)
+                }
+                catch {
+                  case t: Throwable =>
+                    log.error(s"Error in CheckpointCoordinator while 
processing $ackMessage", t)
+                }
+              }(context.dispatcher)
             }
             else {
               log.error(
@@ -1020,30 +1163,46 @@ class JobManager(
   }
 
   /**
-   * Removes the job and sends it to the MemoryArchivist
+   * Removes the job and sends it to the MemoryArchivist.
+   *
+   * This should be called asynchronously. Removing the job from the 
[[SubmittedJobGraphStore]]
+   * might block. Therefore be careful not to block the actor thread.
+   *
    * @param jobID ID of the job to remove and archive
    */
   private def removeJob(jobID: JobID): Unit = {
     currentJobs.synchronized {
-      currentJobs.remove(jobID) match {
+      // Don't remove the job yet...
+      currentJobs.get(jobID) match {
         case Some((eg, _)) =>
           try {
+            // ...otherwise, we can have lingering resources when there is a  
concurrent shutdown
+            // and the ZooKeeper client is closed. Not removing the job 
immediately allow the
+            // shutdown to release all resources.
+            submittedJobGraphs.removeJobGraph(jobID)
+          } catch {
+            case t: Throwable => log.error(s"Could not remove submitted job 
graph $jobID.", t)
+          }
+
+          try {
             eg.prepareForArchiving()
+
             archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
           } catch {
             case t: Throwable => log.error(s"Could not prepare the execution 
graph $eg for " +
               "archiving.", t)
           }
 
+          currentJobs.remove(jobID)
         case None =>
       }
+    }
 
-      try {
-        libraryCacheManager.unregisterJob(jobID)
-      } catch {
-        case t: Throwable =>
-          log.error(s"Could not properly unregister job $jobID form the 
library cache.", t)
-      }
+    try {
+      libraryCacheManager.unregisterJob(jobID)
+    } catch {
+      case t: Throwable =>
+        log.error(s"Could not properly unregister job $jobID form the library 
cache.", t)
     }
   }
 
@@ -1053,17 +1212,21 @@ class JobManager(
     * @param cause Cause for the cancelling.
     */
   private def cancelAndClearEverything(cause: Throwable) {
-    for((jobID, (eg, jobInfo)) <- currentJobs) {
+    for ((jobID, (eg, jobInfo)) <- currentJobs) {
+      try {
+        submittedJobGraphs.removeJobGraph(jobID)
+      }
+      catch {
+        case t: Throwable => {
+          log.error("Error during submitted job graph clean up.", t)
+        }
+      }
+
       eg.fail(cause)
 
-      if(jobInfo.client != ActorRef.noSender) {
+      if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
         jobInfo.client ! decorateMessage(
-          Failure(
-            new JobExecutionException(
-              jobID,
-              "All jobs are cancelled and cleared.",
-              cause)
-          ))
+          Failure(new JobExecutionException(jobID, "All jobs are cancelled and 
cleared.", cause)))
       }
     }
 
@@ -1079,6 +1242,25 @@ class JobManager(
     self ! decorateMessage(RevokeLeadership)
   }
 
+  override def onAddedJobGraph(jobId: JobID): Unit = {
+    if (leaderSessionID.isDefined && !currentJobs.contains(jobId)) {
+      self ! decorateMessage(RecoverJob(jobId))
+    }
+  }
+
+  override def onRemovedJobGraph(jobId: JobID): Unit = {
+    if (leaderSessionID.isDefined) {
+      currentJobs.get(jobId).foreach(
+        job =>
+          future {
+            // Fail the execution graph
+            job._1.fail(new IllegalStateException("Another JobManager removed 
the job from " +
+              "ZooKeeper."))
+          }(context.dispatcher)
+      )
+    }
+  }
+
   override def getAddress: String = {
     AkkaUtils.getAkkaURL(context.system, self)
   }
@@ -1166,7 +1348,7 @@ object JobManager {
       System.exit(STARTUP_FAILURE_RETURN_CODE)
     }
 
-    if (ZooKeeperUtils.isZooKeeperHighAvailabilityEnabled(configuration)) {
+    if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
       // address and will not be reachable from anyone remote
       if (listeningPort != 0) {
         val message = "Config parameter '" + 
ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
@@ -1227,7 +1409,7 @@ object JobManager {
    *
    * @param configuration The configuration object for the JobManager.
    * @param executionMode The execution mode in which to run. Execution mode 
LOCAL will spawn an
-   *                      additional TaskManager in the same process.
+   *                      an additional TaskManager in the same process.
    * @param streamingMode The streaming mode to run the system in (streaming 
vs. batch-only)
    * @param listeningAddress The hostname where the JobManager should listen 
for messages.
    * @param listeningPort The port where the JobManager should listen for 
messages.
@@ -1480,7 +1662,7 @@ object JobManager {
 
     // high availability mode
     val port: Int =
-      if (ZooKeeperUtils.isZooKeeperHighAvailabilityEnabled(configuration)) {
+      if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
         LOG.info("Starting JobManager in High-Availability Mode")
 
         configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
@@ -1524,7 +1706,9 @@ object JobManager {
     Long, // delay between retries
     FiniteDuration, // timeout
     Int, // number of archived jobs
-    LeaderElectionService) = {
+    LeaderElectionService,
+    SubmittedJobGraphStore,
+    CheckpointRecoveryFactory) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
@@ -1588,10 +1772,31 @@ object JobManager {
       }
     }
 
-    val leaderElectionService = leaderElectionServiceOption match {
-      case Some(les) => les
-      case None => 
LeaderElectionUtils.createLeaderElectionService(configuration)
-    }
+    // Create recovery related components
+    val (leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory) 
=
+      RecoveryMode.fromConfig(configuration) match {
+        case RecoveryMode.STANDALONE =>
+          val leaderElectionService = leaderElectionServiceOption match {
+            case Some(les) => les
+            case None => new StandaloneLeaderElectionService()
+          }
+
+          (leaderElectionService,
+            new StandaloneSubmittedJobGraphStore(),
+            new StandaloneCheckpointRecoveryFactory())
+
+        case RecoveryMode.ZOOKEEPER =>
+          val client = ZooKeeperUtils.startCuratorFramework(configuration)
+
+          val leaderElectionService = leaderElectionServiceOption match {
+            case Some(les) => les
+            case None => ZooKeeperUtils.createLeaderElectionService(client, 
configuration)
+          }
+
+          (leaderElectionService,
+            ZooKeeperUtils.createSubmittedJobGraphs(client, configuration),
+            new ZooKeeperCheckpointRecoveryFactory(client, configuration))
+      }
 
     (executionContext,
       instanceManager,
@@ -1599,9 +1804,11 @@ object JobManager {
       libraryCacheManager,
       executionRetries,
       delayBetweenRetries,
-      timeout, 
-      archiveCount, 
-      leaderElectionService)
+      timeout,
+      archiveCount,
+      leaderElectionService,
+      submittedJobGraphs,
+      checkpointRecoveryFactory)
   }
 
   /**
@@ -1633,6 +1840,7 @@ object JobManager {
       jobManagerClass,
       archiveClass)
   }
+
   /**
    * Starts the JobManager and job archiver based on the given configuration, 
in the
    * given actor system.
@@ -1646,28 +1854,30 @@ object JobManager {
    * @param streamingMode The mode to run the system in (streaming vs. 
batch-only)
    * @param jobManagerClass The class of the JobManager to be started
    * @param archiveClass The class of the MemoryArchivist to be started
-   * 
+   *
    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(
-      configuration: Configuration,
-      actorSystem: ActorSystem,
-      jobMangerActorName: Option[String],
-      archiveActorName: Option[String],
-      streamingMode: StreamingMode,
-      jobManagerClass: Class[_ <: JobManager],
-      archiveClass: Class[_ <: MemoryArchivist])
-    : (ActorRef, ActorRef) = {
+                             configuration: Configuration,
+                             actorSystem: ActorSystem,
+                             jobMangerActorName: Option[String],
+                             archiveActorName: Option[String],
+                             streamingMode: StreamingMode,
+                             jobManagerClass: Class[_ <: JobManager],
+                             archiveClass: Class[_ <: MemoryArchivist])
+  : (ActorRef, ActorRef) = {
 
     val (executionContext,
-      instanceManager,
-      scheduler,
-      libraryCacheManager,
-      executionRetries,
-      delayBetweenRetries,
-      timeout,
-      archiveCount,
-      leaderElectionService) = createJobManagerComponents(
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    executionRetries,
+    delayBetweenRetries,
+    timeout,
+    archiveCount,
+    leaderElectionService,
+    submittedJobGraphs,
+    checkpointRecoveryFactory) = createJobManagerComponents(
       configuration,
       None)
 
@@ -1691,7 +1901,9 @@ object JobManager {
       delayBetweenRetries,
       timeout,
       streamingMode,
-      leaderElectionService)
+      leaderElectionService,
+      submittedJobGraphs,
+      checkpointRecoveryFactory)
 
     val jobManager: ActorRef = jobMangerActorName match {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index c29df88..d776622 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -66,6 +66,18 @@ object JobManagerMessages {
     extends RequiresLeaderSessionID
 
   /**
+   * Triggers the recovery of the job with the given ID.
+   *
+   * @param jobId ID of the job to recover
+   */
+  case class RecoverJob(jobId: JobID) extends RequiresLeaderSessionID
+
+  /**
+   * Triggers recovery of all available jobs.
+   */
+  case class RecoverAllJobs() extends RequiresLeaderSessionID
+
+  /**
    * Cancels a job with the given [[jobID]] at the JobManager. The result of 
the cancellation is
    * sent back to the sender as a [[CancellationResponse]] message.
    *
@@ -354,6 +366,10 @@ object JobManagerMessages {
   // --------------------------------------------------------------------------
   // Utility methods to allow simpler case object access from Java
   // --------------------------------------------------------------------------
+
+  def getRequestJobStatus(jobId : JobID) : AnyRef = {
+    RequestJobStatus(jobId)
+  }
   
   def getRequestNumberRegisteredTaskManager : AnyRef = {
     RequestNumberRegisteredTaskManager

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 29add0e..2df3437 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -94,9 +94,7 @@ abstract class FlinkMiniCluster(
 
   implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
 
-  val recoveryMode = RecoveryMode.valueOf(configuration.getString(
-    ConfigConstants.RECOVERY_MODE,
-    ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase)
+  val recoveryMode = RecoveryMode.fromConfig(configuration)
 
   val numJobManagers = getNumberOfJobManagers
 

Reply via email to