[FLINK-2354] [runtime] Add job graph and checkpoint recovery

This closes #1153.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73c73e92
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73c73e92
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73c73e92

Branch: refs/heads/master
Commit: 73c73e92750ab8fb068d0a3cb37afcb642084fc0
Parents: 3aaee1e
Author: Ufuk Celebi <[email protected]>
Authored: Tue Sep 1 17:25:46 2015 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue Oct 20 00:16:51 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  21 +
 .../checkpoint/CheckpointCoordinator.java       | 223 +++---
 .../runtime/checkpoint/CheckpointIDCounter.java |  43 ++
 .../checkpoint/CheckpointRecoveryFactory.java   |  61 ++
 .../runtime/checkpoint/CompletedCheckpoint.java |  81 +++
 .../checkpoint/CompletedCheckpointStore.java    |  69 ++
 .../runtime/checkpoint/PendingCheckpoint.java   |   6 +-
 .../StandaloneCheckpointIDCounter.java          |  47 ++
 .../StandaloneCheckpointRecoveryFactory.java    |  52 ++
 .../StandaloneCompletedCheckpointStore.java     | 100 +++
 .../flink/runtime/checkpoint/StateForTask.java  |  21 +-
 .../checkpoint/SuccessfulCheckpoint.java        |  82 ---
 .../ZooKeeperCheckpointIDCounter.java           | 130 ++++
 .../ZooKeeperCheckpointRecoveryFactory.java     |  66 ++
 .../ZooKeeperCompletedCheckpointStore.java      | 293 ++++++++
 .../runtime/executiongraph/ExecutionGraph.java  |  51 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |   5 +
 .../flink/runtime/jobmanager/RecoveryMode.java  |  13 +
 .../StandaloneSubmittedJobGraphStore.java       |  65 ++
 .../runtime/jobmanager/SubmittedJobGraph.java   |  77 ++
 .../jobmanager/SubmittedJobGraphStore.java      |  93 +++
 .../ZooKeeperSubmittedJobGraphStore.java        | 379 ++++++++++
 .../leaderelection/LeaderElectionService.java   |   1 +
 .../ZooKeeperLeaderElectionService.java         |   1 +
 .../flink/runtime/state/StateBackend.java       |  39 +
 .../state/StateHandleProviderFactory.java       |  61 ++
 .../flink/runtime/util/LeaderElectionUtils.java |  67 --
 .../flink/runtime/util/ZooKeeperUtils.java      | 138 +++-
 .../zookeeper/ZooKeeperStateHandleStore.java    | 384 ++++++++++
 .../flink/runtime/jobmanager/JobInfo.scala      |  25 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 594 ++++++++++-----
 .../runtime/messages/JobManagerMessages.scala   |  16 +
 .../runtime/minicluster/FlinkMiniCluster.scala  |   4 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 190 ++---
 .../checkpoint/CheckpointIDCounterTest.java     | 194 +++++
 .../checkpoint/CheckpointStateRestoreTest.java  |  19 +-
 .../CompletedCheckpointStoreTest.java           | 297 ++++++++
 .../StandaloneCompletedCheckpointStoreTest.java |  33 +
 ...ZooKeeperCompletedCheckpointStoreITCase.java | 101 +++
 .../BlobLibraryCacheManagerTest.java            |   4 +
 .../PartitionRequestClientFactoryTest.java      |   2 +-
 ...ManagerSubmittedJobGraphsRecoveryITCase.java | 460 ++++++++++++
 .../StandaloneSubmittedJobGraphStoreTest.java   |  53 ++
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java | 283 ++++++++
 .../JobManagerLeaderElectionTest.java           |  27 +-
 .../TestingLeaderElectionService.java           |   4 +
 .../ZooKeeperLeaderRetrievalTest.java           |  21 +-
 .../messages/CheckpointMessagesTest.java        |   2 +-
 .../runtime/taskmanager/TaskCancelTest.java     |  67 +-
 .../runtime/testutils/CommonTestUtils.java      |  75 +-
 .../testutils/JobManagerActorTestUtils.java     | 166 +++++
 .../runtime/testutils/JobManagerProcess.java    | 226 ++++++
 .../runtime/testutils/TaskManagerProcess.java   | 133 ++++
 .../flink/runtime/testutils/TestJvmProcess.java | 267 +++++++
 .../runtime/testutils/ZooKeeperTestUtils.java   |  94 +++
 .../ZooKeeperStateHandleStoreITCase.java        | 591 +++++++++++++++
 .../zookeeper/ZooKeeperTestEnvironment.java     | 133 ++++
 .../ExecutionGraphRestartTest.scala             |  21 +-
 .../runtime/testingUtils/TestingCluster.scala   |  44 +-
 .../testingUtils/TestingJobManager.scala        |  32 +-
 .../streaming/runtime/tasks/StreamTask.java     |   2 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |  36 +-
 .../checkpointing/StateCheckpoinedITCase.java   | 391 ----------
 .../checkpointing/StateCheckpointedITCase.java  | 391 ++++++++++
 ...tJobManagerProcessFailureRecoveryITCase.java | 289 ++++++++
 .../AbstractProcessFailureRecoveryTest.java     | 444 ------------
 ...ctTaskManagerProcessFailureRecoveryTest.java | 397 +++++++++++
 .../flink/test/recovery/ChaosMonkeyITCase.java  | 713 +++++++++++++++++++
 .../JobManagerCheckpointRecoveryITCase.java     | 395 ++++++++++
 ...anagerProcessFailureBatchRecoveryITCase.java | 140 ++++
 .../ProcessFailureBatchRecoveryITCase.java      | 115 ---
 .../recovery/ProcessFailureCancelingITCase.java |   4 +-
 .../ProcessFailureStreamingRecoveryITCase.java  | 234 ------
 ...anagerProcessFailureBatchRecoveryITCase.java | 115 +++
 ...erProcessFailureStreamingRecoveryITCase.java | 234 ++++++
 .../ZooKeeperLeaderElectionITCase.java          |  61 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |  11 +-
 77 files changed, 8901 insertions(+), 1918 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 5d6f1c7..be730a0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -407,6 +407,12 @@ public final class ConfigConstants {
         */
        public static final String STATE_BACKEND = "state.backend";
        
+       /**
+        * File system state backend base path for recoverable state handles. 
Recovery state is written
+        * to this path and the file state handles are persisted for recovery.
+        */
+       public static final String STATE_BACKEND_FS_RECOVERY_PATH = 
"state.backend.fs.dir.recovery";
+       
        // ----------------------------- Miscellaneous 
----------------------------
        
        /**
@@ -433,6 +439,15 @@ public final class ConfigConstants {
 
        public static final String ZOOKEEPER_LEADER_PATH = 
"ha.zookeeper.dir.leader";
 
+       /** ZooKeeper root path (ZNode) for job graphs. */
+       public static final String ZOOKEEPER_JOBGRAPHS_PATH = 
"ha.zookeeper.dir.jobgraphs";
+
+       /** ZooKeeper root path (ZNode) for completed checkpoints. */
+       public static final String ZOOKEEPER_CHECKPOINTS_PATH = 
"ha.zookeeper.dir.checkpoints";
+
+       /** ZooKeeper root path (ZNode) for checkpoint counters. */
+       public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = 
"ha.zookeeper.dir.checkpoint-counter";
+
        public static final String ZOOKEEPER_SESSION_TIMEOUT = 
"ha.zookeeper.client.session-timeout";
 
        public static final String ZOOKEEPER_CONNECTION_TIMEOUT = 
"ha.zookeeper.client.connection-timeout";
@@ -699,6 +714,12 @@ public final class ConfigConstants {
 
        public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader";
 
+       public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = 
"/jobgraphs";
+
+       public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = 
"/checkpoints";
+
+       public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = 
"/checkpoint-counter";
+
        public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000;
 
        public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000;

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 8f0b19b..fdb59d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,16 +21,16 @@ package org.apache.flink.runtime.checkpoint;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -48,13 +47,19 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * The checkpoint coordinator coordinates the distributed snapshots of 
operators and state.
  * It triggers the checkpoint by sending the messages to the relevant tasks 
and collects the
  * checkpoint acknowledgements. It also collects and maintains the overview of 
the state handles
  * reported by the tasks that acknowledge the checkpoint.
+ *
+ * <p>Depending on the configured {@link RecoveryMode}, the behaviour of the 
{@link
+ * CompletedCheckpointStore} and {@link CheckpointIDCounter} change. The 
default standalone
+ * implementations don't support any recovery.
  */
 public class CheckpointCoordinator {
        
@@ -79,12 +84,20 @@ public class CheckpointCoordinator {
        private final ExecutionVertex[] tasksToCommitTo;
 
        private final Map<Long, PendingCheckpoint> pendingCheckpoints;
-       
-       private final ArrayDeque<SuccessfulCheckpoint> completedCheckpoints;
+
+       /**
+        * Completed checkpoints. Implementations can be blocking. Make sure 
calls to methods
+        * accessing this don't block the job manager actor and run 
asynchronously.
+        */
+       private final CompletedCheckpointStore completedCheckpointStore;
        
        private final ArrayDeque<Long> recentPendingCheckpoints;
 
-       private final AtomicLong checkpointIdCounter = new AtomicLong(1);
+       /**
+        * Checkpoint ID counter to ensure ascending IDs. In case of job 
manager failures, these
+        * need to be ascending across job managers.
+        */
+       private final CheckpointIDCounter checkpointIdCounter;
 
        private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new 
AtomicInteger();
 
@@ -93,8 +106,6 @@ public class CheckpointCoordinator {
        
        private final long checkpointTimeout;
        
-       private final int numSuccessfulCheckpointsToRetain;
-       
        private TimerTask periodicScheduler;
        
        private ActorGateway jobStatusListener;
@@ -110,61 +121,62 @@ public class CheckpointCoordinator {
 
        public CheckpointCoordinator(
                        JobID job,
-                       int numSuccessfulCheckpointsToRetain,
                        long checkpointTimeout,
                        ExecutionVertex[] tasksToTrigger,
                        ExecutionVertex[] tasksToWaitFor,
                        ExecutionVertex[] tasksToCommitTo,
-                       ClassLoader userClassLoader) {
+                       ClassLoader userClassLoader,
+                       CheckpointIDCounter checkpointIDCounter,
+                       CompletedCheckpointStore completedCheckpointStore,
+                       RecoveryMode recoveryMode) throws Exception {
                
-               // some sanity checks
-               if (job == null || tasksToTrigger == null ||
-                               tasksToWaitFor == null || tasksToCommitTo == 
null) {
-                       throw new NullPointerException();
-               }
-               if (numSuccessfulCheckpointsToRetain < 1) {
-                       throw new IllegalArgumentException("Must retain at 
least one successful checkpoint");
-               }
-               if (checkpointTimeout < 1) {
-                       throw new IllegalArgumentException("Checkpoint timeout 
must be larger than zero");
-               }
+               // Sanity check
+               checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must 
be larger than zero");
                
-               this.job = job;
-               this.numSuccessfulCheckpointsToRetain = 
numSuccessfulCheckpointsToRetain;
+               this.job = checkNotNull(job);
                this.checkpointTimeout = checkpointTimeout;
-               this.tasksToTrigger = tasksToTrigger;
-               this.tasksToWaitFor = tasksToWaitFor;
-               this.tasksToCommitTo = tasksToCommitTo;
+               this.tasksToTrigger = checkNotNull(tasksToTrigger);
+               this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
+               this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
                this.pendingCheckpoints = new LinkedHashMap<Long, 
PendingCheckpoint>();
-               this.completedCheckpoints = new 
ArrayDeque<SuccessfulCheckpoint>(numSuccessfulCheckpointsToRetain + 1);
+               this.completedCheckpointStore = 
checkNotNull(completedCheckpointStore);
                this.recentPendingCheckpoints = new 
ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
                this.userClassLoader = userClassLoader;
+               this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
+               checkpointIDCounter.start();
 
-               timer = new Timer("Checkpoint Timer", true);
+               this.timer = new Timer("Checkpoint Timer", true);
 
-               // Add shutdown hook to clean up state handles
-               shutdownHook = new Thread(new Runnable() {
-                       @Override
-                       public void run() {
-                               try {
-                                       CheckpointCoordinator.this.shutdown();
-                               }
-                               catch (Throwable t) {
-                                       LOG.error("Error during shutdown of 
blob service via JVM shutdown hook: " +
-                                                       t.getMessage(), t);
+               if (recoveryMode == RecoveryMode.STANDALONE) {
+                       // Add shutdown hook to clean up state handles when no 
checkpoint recovery is
+                       // possible. In case of another configured recovery 
mode, the checkpoints need to be
+                       // available for the standby job managers.
+                       this.shutdownHook = new Thread(new Runnable() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               
CheckpointCoordinator.this.shutdown();
+                                       }
+                                       catch (Throwable t) {
+                                               LOG.error("Error during 
shutdown of checkpoint coordniator via " +
+                                                               "JVM shutdown 
hook: " + t.getMessage(), t);
+                                       }
                                }
-                       }
-               });
+                       });
 
-               try {
-                       // Add JVM shutdown hook to call shutdown of service
-                       Runtime.getRuntime().addShutdownHook(shutdownHook);
-               }
-               catch (IllegalStateException ignored) {
-                       // JVM is already shutting down. No need to do anything.
+                       try {
+                               // Add JVM shutdown hook to call shutdown of 
service
+                               
Runtime.getRuntime().addShutdownHook(shutdownHook);
+                       }
+                       catch (IllegalStateException ignored) {
+                               // JVM is already shutting down. No need to do 
anything.
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Cannot register checkpoint 
coordinator shutdown hook.", t);
+                       }
                }
-               catch (Throwable t) {
-                       LOG.error("Cannot register checkpoint coordinator 
shutdown hook.", t);
+               else {
+                       this.shutdownHook = null;
                }
        }
 
@@ -178,41 +190,39 @@ public class CheckpointCoordinator {
         * After this method has been called, the coordinator does not accept 
and further
         * messages and cannot trigger any further checkpoints.
         */
-       public void shutdown() {
+       public void shutdown() throws Exception {
                synchronized (lock) {
-                       try {   
-                               if (shutdown) {
-                                       return;
-                               }
-                               shutdown = true;
-                               LOG.info("Stopping checkpoint coordinator for 
job " + job);
-                       
-                               // shut down the thread that handles the 
timeouts
-                               timer.cancel();
-                       
-                               // make sure that the actor does not linger
-                               if (jobStatusListener != null) {
-                                       
jobStatusListener.tell(PoisonPill.getInstance());
-                                       jobStatusListener = null;
-                               }
-                       
-                               // the scheduling thread needs also to go away
-                               if (periodicScheduler != null) {
-                                       periodicScheduler.cancel();
-                                       periodicScheduler = null;
-                               }
-                       
-                               // clear and discard all pending checkpoints
-                               for (PendingCheckpoint pending : 
pendingCheckpoints.values()) {
-                                               
pending.discard(userClassLoader, true);
-                               }
-                               pendingCheckpoints.clear();
-                       
-                               // clean and discard all successful checkpoints
-                               for (SuccessfulCheckpoint checkpoint : 
completedCheckpoints) {
-                                       checkpoint.discard(userClassLoader);
+                       try {
+                               if (!shutdown) {
+                                       shutdown = true;
+                                       LOG.info("Stopping checkpoint 
coordinator for job " + job);
+
+                                       // shut down the thread that handles 
the timeouts
+                                       timer.cancel();
+
+                                       // make sure that the actor does not 
linger
+                                       if (jobStatusListener != null) {
+                                               
jobStatusListener.tell(PoisonPill.getInstance());
+                                               jobStatusListener = null;
+                                       }
+
+                                       // the scheduling thread needs also to 
go away
+                                       if (periodicScheduler != null) {
+                                               periodicScheduler.cancel();
+                                               periodicScheduler = null;
+                                       }
+
+                                       checkpointIdCounter.stop();
+
+                                       // clear and discard all pending 
checkpoints
+                                       for (PendingCheckpoint pending : 
pendingCheckpoints.values()) {
+                                                       
pending.discard(userClassLoader, true);
+                                       }
+                                       pendingCheckpoints.clear();
+
+                                       // clean and discard all successful 
checkpoints
+                                       
completedCheckpointStore.discardAllCheckpoints();
                                }
-                               completedCheckpoints.clear();
                        }
                        finally {
                                // Remove shutdown hook to prevent resource 
leaks, unless this is invoked by the
@@ -244,7 +254,7 @@ public class CheckpointCoordinator {
         * Triggers a new checkpoint and uses the current system time as the
         * checkpoint time.
         */
-       public void triggerCheckpoint() {
+       public void triggerCheckpoint() throws Exception {
                triggerCheckpoint(System.currentTimeMillis());
        }
 
@@ -254,7 +264,7 @@ public class CheckpointCoordinator {
         * 
         * @param timestamp The timestamp for the checkpoint.
         */
-       public boolean triggerCheckpoint(final long timestamp) {
+       public boolean triggerCheckpoint(final long timestamp) throws Exception 
{
                if (shutdown) {
                        LOG.error("Cannot trigger checkpoint, checkpoint 
coordinator has been shutdown.");
                        return false;
@@ -354,7 +364,7 @@ public class CheckpointCoordinator {
                }
        }
        
-       public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) {
+       public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) 
throws Exception {
                if (shutdown || message == null) {
                        return;
                }
@@ -365,7 +375,7 @@ public class CheckpointCoordinator {
                
                final long checkpointId = message.getCheckpointId();
 
-               SuccessfulCheckpoint completed = null;
+               CompletedCheckpoint completed = null;
                PendingCheckpoint checkpoint;
                synchronized (lock) {
                        // we need to check inside the lock for being shutdown 
as well, otherwise we
@@ -380,13 +390,13 @@ public class CheckpointCoordinator {
                                if 
(checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState())) {
                                        
                                        if (checkpoint.isFullyAcknowledged()) {
+                                               completed = 
checkpoint.toCompletedCheckpoint();
+
+                                               
completedCheckpointStore.addCheckpoint(completed);
+
                                                LOG.info("Completed checkpoint 
" + checkpointId);
+                                               
LOG.debug(completed.getStates().toString());
 
-                                               completed = 
checkpoint.toCompletedCheckpoint();
-                                               
completedCheckpoints.addLast(completed);
-                                               if (completedCheckpoints.size() 
> numSuccessfulCheckpointsToRetain) {
-                                                       
completedCheckpoints.removeFirst().discard(userClassLoader);
-                                               }
                                                
pendingCheckpoints.remove(checkpointId);
                                                
rememberRecentCheckpointId(checkpointId);
                                                
@@ -456,25 +466,30 @@ public class CheckpointCoordinator {
        //  Checkpoint State Restoring
        // 
--------------------------------------------------------------------------------------------
 
-       public void restoreLatestCheckpointedState(Map<JobVertexID, 
ExecutionJobVertex> tasks,
-                                                                               
                boolean errorIfNoCheckpoint,
-                                                                               
                boolean allOrNothingState) throws Exception {
+       public void restoreLatestCheckpointedState(
+                       Map<JobVertexID, ExecutionJobVertex> tasks,
+                       boolean errorIfNoCheckpoint,
+                       boolean allOrNothingState) throws Exception {
+
                synchronized (lock) {
                        if (shutdown) {
                                throw new 
IllegalStateException("CheckpointCoordinator is shut down");
                        }
-                       
-                       if (completedCheckpoints.isEmpty()) {
+
+                       // Recover the checkpoints
+                       completedCheckpointStore.recover();
+
+                       // restore from the latest checkpoint
+                       CompletedCheckpoint latest = 
completedCheckpointStore.getLatestCheckpoint();
+
+                       if (latest == null) {
                                if (errorIfNoCheckpoint) {
                                        throw new IllegalStateException("No 
completed checkpoint available");
                                } else {
                                        return;
                                }
                        }
-                       
-                       // restore from the latest checkpoint
-                       SuccessfulCheckpoint latest = 
completedCheckpoints.getLast();
-                                               
+
                        if (allOrNothingState) {
                                Map<ExecutionJobVertex, Integer> stateCounts = 
new HashMap<ExecutionJobVertex, Integer>();
 
@@ -519,7 +534,9 @@ public class CheckpointCoordinator {
        }
 
        public int getNumberOfRetainedSuccessfulCheckpoints() {
-               return this.completedCheckpoints.size();
+               synchronized (lock) {
+                       return 
completedCheckpointStore.getNumberOfRetainedCheckpoints();
+               }
        }
 
        public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
@@ -528,9 +545,9 @@ public class CheckpointCoordinator {
                }
        }
        
-       public List<SuccessfulCheckpoint> getSuccessfulCheckpoints() {
+       public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws 
Exception {
                synchronized (lock) {
-                       return new 
ArrayList<SuccessfulCheckpoint>(this.completedCheckpoints);
+                       return completedCheckpointStore.getAllCheckpoints();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
new file mode 100644
index 0000000..34b7946
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * A checkpoint ID counter.
+ */
+public interface CheckpointIDCounter {
+
+       /**
+        * Starts the {@link CheckpointIDCounter} service.
+        */
+       void start() throws Exception;
+
+       /**
+        * Stops the {@link CheckpointIDCounter} service.
+        */
+       void stop() throws Exception;
+
+       /**
+        * Atomically increments the current checkpoint ID.
+        *
+        * @return The previous checkpoint ID
+        */
+       long getAndIncrement() throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
new file mode 100644
index 0000000..aa6e94b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * A factory for per Job checkpoint recovery components.
+ */
+public interface CheckpointRecoveryFactory {
+
+       /**
+        * The number of {@link CompletedCheckpoint} instances to retain.
+        */
+       int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
+
+       /**
+        * Starts the {@link CheckpointRecoveryFactory} service.
+        */
+       void start();
+
+       /**
+        * Stops the {@link CheckpointRecoveryFactory} service.
+        */
+       void stop();
+
+       /**
+        * Creates a {@link CompletedCheckpointStore} instance for a job.
+        *
+        * @param jobId           Job ID to recover checkpoints for
+        * @param userClassLoader User code class loader of the job
+        * @return {@link CompletedCheckpointStore} instance for the job
+        */
+       CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, 
ClassLoader userClassLoader)
+                       throws Exception;
+
+       /**
+        * Creates a {@link CheckpointIDCounter} instance for a job.
+        *
+        * @param jobId Job ID to recover checkpoints for
+        * @return {@link CheckpointIDCounter} instance for the job
+        */
+       CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws 
Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
new file mode 100644
index 0000000..ea3c26d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A successful checkpoint describes a checkpoint after all required tasks 
acknowledged it (with their state)
+ * and that is considered completed.
+ */
+public class CompletedCheckpoint implements Serializable {
+
+       private static final long serialVersionUID = -8360248179615702014L;
+
+       private final JobID job;
+       
+       private final long checkpointID;
+       
+       private final long timestamp;
+       
+       private final ArrayList<StateForTask> states;
+
+       public CompletedCheckpoint(JobID job, long checkpointID, long 
timestamp, ArrayList<StateForTask> states) {
+               this.job = job;
+               this.checkpointID = checkpointID;
+               this.timestamp = timestamp;
+               this.states = states;
+       }
+
+       public JobID getJobId() {
+               return job;
+       }
+
+       public long getCheckpointID() {
+               return checkpointID;
+       }
+
+       public long getTimestamp() {
+               return timestamp;
+       }
+
+       public List<StateForTask> getStates() {
+               return states;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public void discard(ClassLoader userClassLoader) {
+               for(StateForTask state: states){
+                       state.discard(userClassLoader);
+               }
+               states.clear();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public String toString() {
+               return String.format("Checkpoint %d @ %d for %s", checkpointID, 
timestamp, job);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
new file mode 100644
index 0000000..d024aea
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.util.List;
+
+/**
+ * A bounded LIFO-queue of {@link CompletedCheckpoint} instances.
+ */
+public interface CompletedCheckpointStore {
+
+       /**
+        * Recover available {@link CompletedCheckpoint} instances.
+        *
+        * <p>After a call to this method, {@link #getLatestCheckpoint()} 
returns the latest
+        * available checkpoint.
+        */
+       void recover() throws Exception;
+
+       /**
+        * Adds a {@link CompletedCheckpoint} instance to the list of completed 
checkpoints.
+        *
+        * <p>Only a bounded number of checkpoints is kept. When exceeding the 
maximum number of
+        * retained checkpoints, the oldest one will be discarded via {@link
+        * CompletedCheckpoint#discard(ClassLoader)}.
+        */
+       void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception;
+
+       /**
+        * Returns the latest {@link CompletedCheckpoint} instance or 
<code>null</code> if none was
+        * added.
+        */
+       CompletedCheckpoint getLatestCheckpoint() throws Exception;
+
+       /**
+        * Discards all added {@link CompletedCheckpoint} instances via {@link
+        * CompletedCheckpoint#discard(ClassLoader)}.
+        */
+       void discardAllCheckpoints() throws Exception;
+
+       /**
+        * Returns all {@link CompletedCheckpoint} instances.
+        *
+        * <p>Returns an empty list if no checkpoint has been added yet.
+        */
+       List<CompletedCheckpoint> getAllCheckpoints() throws Exception;
+
+       /**
+        * Returns the current number of retained checkpoints.
+        */
+       int getNumberOfRetainedCheckpoints();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 370ae50..81159f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -31,7 +31,7 @@ import org.apache.flink.util.SerializedValue;
 /**
  * A pending checkpoint is a checkpoint that has been started, but has not been
  * acknowledged by all tasks that need to acknowledge it. Once all tasks have
- * acknowledged it, it becomes a {@link SuccessfulCheckpoint}.
+ * acknowledged it, it becomes a {@link CompletedCheckpoint}.
  * 
  * <p>Note that the pending checkpoint, as well as the successful checkpoint 
keep the
  * state handles always as serialized values, never as actual values.</p>
@@ -109,13 +109,13 @@ public class PendingCheckpoint {
                return collectedStates;
        }
        
-       public SuccessfulCheckpoint toCompletedCheckpoint() {
+       public CompletedCheckpoint toCompletedCheckpoint() {
                synchronized (lock) {
                        if (discarded) {
                                throw new IllegalStateException("pending 
checkpoint is discarded");
                        }
                        if (notYetAcknowledgedTasks.isEmpty()) {
-                               SuccessfulCheckpoint completed =  new 
SuccessfulCheckpoint(jobId, checkpointId,
+                               CompletedCheckpoint completed =  new 
CompletedCheckpoint(jobId, checkpointId,
                                                checkpointTimestamp, new 
ArrayList<StateForTask>(collectedStates));
                                discard(null, false);
                                

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
new file mode 100644
index 0000000..052d743
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * {@link CheckpointIDCounter} instances for JobManagers running in {@link 
RecoveryMode#STANDALONE}.
+ *
+ * <p>Simple wrapper of an {@link AtomicLong}. This is sufficient, because job 
managers are not
+ * recoverable in this recovery mode.
+ */
+public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
+
+       private final AtomicLong checkpointIdCounter = new AtomicLong(1);
+
+       @Override
+       public void start() throws Exception {
+       }
+
+       @Override
+       public void stop() throws Exception {
+       }
+
+       @Override
+       public long getAndIncrement() throws Exception {
+               return checkpointIdCounter.getAndIncrement();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
new file mode 100644
index 0000000..324a0be
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+
+/**
+ * {@link CheckpointCoordinator} components in {@link RecoveryMode#STANDALONE}.
+ */
+public class StandaloneCheckpointRecoveryFactory implements 
CheckpointRecoveryFactory {
+
+       @Override
+       public void start() {
+               // Nothing to do
+       }
+
+       @Override
+       public void stop() {
+               // Nothing to do
+       }
+
+       @Override
+       public CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, 
ClassLoader userClassLoader)
+                       throws Exception {
+
+               return new 
StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory
+                               .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, 
userClassLoader);
+       }
+
+       @Override
+       public CheckpointIDCounter createCheckpointIDCounter(JobID ignored) {
+               return new StandaloneCheckpointIDCounter();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
new file mode 100644
index 0000000..c31606a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CompletedCheckpointStore} for JobManagers running in {@link 
RecoveryMode#STANDALONE}.
+ */
+class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
+
+       /** The maximum number of checkpoints to retain (at least 1). */
+       private final int maxNumberOfCheckpointsToRetain;
+
+       /** User class loader for discarding {@link CompletedCheckpoint} 
instances. */
+       private final ClassLoader userClassLoader;
+
+       /** The completed checkpoints. */
+       private final ArrayDeque<CompletedCheckpoint> checkpoints;
+
+       /**
+        * Creates {@link StandaloneCompletedCheckpointStore}.
+        *
+        * @param maxNumberOfCheckpointsToRetain The maximum number of 
checkpoints to retain (at
+        *                                       least 1). Adding more 
checkpoints than this results
+        *                                       in older checkpoints being 
discarded.
+        * @param userClassLoader                The user class loader used to 
discard checkpoints
+        */
+       public StandaloneCompletedCheckpointStore(
+                       int maxNumberOfCheckpointsToRetain,
+                       ClassLoader userClassLoader) {
+
+               checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain 
at least one checkpoint.");
+
+               this.maxNumberOfCheckpointsToRetain = 
maxNumberOfCheckpointsToRetain;
+               this.userClassLoader = checkNotNull(userClassLoader, "User 
class loader");
+
+               this.checkpoints = new 
ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
+       }
+
+       @Override
+       public void recover() throws Exception {
+               // Nothing to do
+       }
+
+       @Override
+       public void addCheckpoint(CompletedCheckpoint checkpoint) {
+               checkpoints.addLast(checkpoint);
+               if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
+                       checkpoints.removeFirst().discard(userClassLoader);
+               }
+       }
+
+       @Override
+       public CompletedCheckpoint getLatestCheckpoint() {
+               return checkpoints.isEmpty() ? null : checkpoints.getLast();
+       }
+
+       @Override
+       public List<CompletedCheckpoint> getAllCheckpoints() {
+               return new ArrayList<>(checkpoints);
+       }
+
+       @Override
+       public int getNumberOfRetainedCheckpoints() {
+               return checkpoints.size();
+       }
+
+       @Override
+       public void discardAllCheckpoints() {
+               for (CompletedCheckpoint checkpoint : checkpoints) {
+                       checkpoint.discard(userClassLoader);
+               }
+
+               checkpoints.clear();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
index 120c503..d1428f4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
@@ -24,6 +24,11 @@ import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * Simple bean to describe the state belonging to a parallel operator.
  * Since we hold the state across execution attempts, we identify a task by its
@@ -34,8 +39,10 @@ import org.slf4j.LoggerFactory;
  * Furthermore, the state may involve user-defined classes that are not 
accessible without
  * the respective classloader.
  */
-public class StateForTask {
-       
+public class StateForTask implements Serializable {
+
+       private static final long serialVersionUID = -2394696997971923995L;
+
        private static final Logger LOG = 
LoggerFactory.getLogger(StateForTask.class);
 
        /** The state of the parallel operator */
@@ -48,12 +55,10 @@ public class StateForTask {
        private final int subtask;
        
        public StateForTask(SerializedValue<StateHandle<?>> state, JobVertexID 
operatorId, int subtask) {
-       if (state == null || operatorId == null || subtask < 0) {
-                       throw new IllegalArgumentException();
-               }
-               
-               this.state = state;
-               this.operatorId = operatorId;
+               this.state = checkNotNull(state, "State");
+               this.operatorId = checkNotNull(operatorId, "Operator ID");
+
+               checkArgument(subtask >= 0, "Negative subtask index");
                this.subtask = subtask;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
deleted file mode 100644
index be0b301..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.api.common.JobID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * A successful checkpoint describes a checkpoint after all required tasks 
acknowledged it (with their state)
- * and that is considered completed.
- */
-public class SuccessfulCheckpoint {
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(SuccessfulCheckpoint.class);
-       
-       private final JobID job;
-       
-       private final long checkpointID;
-       
-       private final long timestamp;
-       
-       private final List<StateForTask> states;
-
-
-       public SuccessfulCheckpoint(JobID job, long checkpointID, long 
timestamp, List<StateForTask> states) {
-               this.job = job;
-               this.checkpointID = checkpointID;
-               this.timestamp = timestamp;
-               this.states = states;
-       }
-
-       public JobID getJobId() {
-               return job;
-       }
-
-       public long getCheckpointID() {
-               return checkpointID;
-       }
-
-       public long getTimestamp() {
-               return timestamp;
-       }
-
-       public List<StateForTask> getStates() {
-               return states;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       public void discard(ClassLoader userClassLoader) {
-               for(StateForTask state: states){
-                       state.discard(userClassLoader);
-               }
-               states.clear();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public String toString() {
-               return String.format("Checkpoint %d @ %d for %s", checkpointID, 
timestamp, job);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
new file mode 100644
index 0000000..6673050
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.VersionedValue;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CheckpointIDCounter} instances for JobManagers running in {@link 
RecoveryMode#ZOOKEEPER}.
+ *
+ * <p>Each counter creates a ZNode:
+ * <pre>
+ * +----O /flink/checkpoint-counter/&lt;job-id&gt; 1 [persistent]
+ * .
+ * .
+ * .
+ * +----O /flink/checkpoint-counter/&lt;job-id&gt; N [persistent]
+ * </pre>
+ *
+ * <p>The checkpoints IDs are required to be ascending (per job). In order to 
guarantee this in case
+ * of job manager failures we use ZooKeeper to have a shared counter across 
job manager instances.
+ */
+public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperCheckpointIDCounter.class);
+
+       /** Curator ZooKeeper client */
+       private final CuratorFramework client;
+
+       /** Path of the shared count */
+       private final String counterPath;
+
+       /** Curator recipe for shared counts */
+       private final SharedCount sharedCount;
+
+       /** Connection state listener to monitor the client connection */
+       private final SharedCountConnectionStateListener connStateListener =
+                       new SharedCountConnectionStateListener();
+
+       /**
+        * Creates a {@link ZooKeeperCheckpointIDCounter} instance.
+        *
+        * @param client      Curator ZooKeeper client
+        * @param counterPath ZooKeeper path for the counter. It's sufficient 
to have a path per-job.
+        * @throws Exception
+        */
+       public ZooKeeperCheckpointIDCounter(CuratorFramework client, String 
counterPath) throws Exception {
+               this.client = checkNotNull(client, "Curator client");
+               this.counterPath = checkNotNull(counterPath, "Counter path");
+               this.sharedCount = new SharedCount(client, counterPath, 1);
+       }
+
+       @Override
+       public void start() throws Exception {
+               sharedCount.start();
+               
client.getConnectionStateListenable().addListener(connStateListener);
+       }
+
+       @Override
+       public void stop() throws Exception {
+               sharedCount.close();
+               
client.getConnectionStateListenable().removeListener(connStateListener);
+
+               LOG.info("Removing {} from ZooKeeper", counterPath);
+               
client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
+       }
+
+       @Override
+       public long getAndIncrement() throws Exception {
+               while (true) {
+                       ConnectionState connState = 
connStateListener.getLastState();
+
+                       if (connState != null) {
+                               throw new IllegalStateException("Connection 
state: " + connState);
+                       }
+
+                       VersionedValue<Integer> current = 
sharedCount.getVersionedValue();
+
+                       Integer newCount = current.getValue() + 1;
+
+                       if (sharedCount.trySetCount(current, newCount)) {
+                               return current.getValue();
+                       }
+               }
+       }
+
+       /**
+        * Connection state listener. In case of {@link 
ConnectionState#SUSPENDED} or {@link
+        * ConnectionState#LOST} we are not guaranteed to read a current count 
from ZooKeeper.
+        */
+       private class SharedCountConnectionStateListener implements 
ConnectionStateListener {
+
+               private volatile ConnectionState lastState;
+
+               @Override
+               public void stateChanged(CuratorFramework client, 
ConnectionState newState) {
+                       if (newState == ConnectionState.SUSPENDED || newState 
== ConnectionState.LOST) {
+                               lastState = newState;
+                       }
+               }
+
+               private ConnectionState getLastState() {
+                       return lastState;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
new file mode 100644
index 0000000..2659e7e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CheckpointCoordinator} components in {@link RecoveryMode#ZOOKEEPER}.
+ */
+public class ZooKeeperCheckpointRecoveryFactory implements 
CheckpointRecoveryFactory {
+
+       private final CuratorFramework client;
+
+       private final Configuration config;
+
+       public ZooKeeperCheckpointRecoveryFactory(CuratorFramework client, 
Configuration config) {
+               this.client = checkNotNull(client, "Curator client");
+               this.config = checkNotNull(config, "Configuration");
+       }
+
+       @Override
+       public void start() {
+               // Nothing to do
+       }
+
+       @Override
+       public void stop() {
+               client.close();
+       }
+
+       @Override
+       public CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, 
ClassLoader userClassLoader)
+                       throws Exception {
+
+               return ZooKeeperUtils.createCompletedCheckpoints(client, 
config, jobId,
+                               NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, 
userClassLoader);
+       }
+
+       @Override
+       public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) 
throws Exception {
+               return ZooKeeperUtils.createCheckpointIDCounter(client, config, 
jobID);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
new file mode 100644
index 0000000..62ab440
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CompletedCheckpointStore} for JobManagers running in {@link 
RecoveryMode#ZOOKEEPER}.
+ *
+ * <p>Checkpoints are added under a ZNode per job:
+ * <pre>
+ * +----O /flink/checkpoints/&lt;job-id&gt;  [persistent]
+ * .    |
+ * .    +----O /flink/checkpoints/&lt;job-id&gt;/1 [persistent]
+ * .    .                                  .
+ * .    .                                  .
+ * .    .                                  .
+ * .    +----O /flink/checkpoints/&lt;job-id&gt;/N [persistent]
+ * </pre>
+ *
+ * <p>During recovery, the latest checkpoint is read from ZooKeeper. If there 
is more than one,
+ * only the latest one is used and older ones are discarded (even if the 
maximum number
+ * of retained checkpoints is greater than one).
+ *
+ * <p>If there is a network partition and multiple JobManagers run concurrent 
checkpoints for the
+ * same program, it is OK to take any valid successful checkpoint as long as 
the "history" of
+ * checkpoints is consistent. Currently, after recovery we start out with only 
a single
+ * checkpoint to circumvent those situations.
+ */
+public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointStore {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
+
+       /** Curator ZooKeeper client */
+       private final CuratorFramework client;
+
+       /** Completed checkpoints in ZooKeeper */
+       private final ZooKeeperStateHandleStore<CompletedCheckpoint> 
checkpointsInZooKeeper;
+
+       /** The maximum number of checkpoints to retain (at least 1). */
+       private final int maxNumberOfCheckpointsToRetain;
+
+       /** User class loader for discarding {@link CompletedCheckpoint} 
instances. */
+       private final ClassLoader userClassLoader;
+
+       /** Local completed checkpoints. */
+       private final ArrayDeque<Tuple2<StateHandle<CompletedCheckpoint>, 
String>> checkpointStateHandles;
+
+       /**
+        * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
+        *
+        * @param maxNumberOfCheckpointsToRetain The maximum number of 
checkpoints to retain (at
+        *                                       least 1). Adding more 
checkpoints than this results
+        *                                       in older checkpoints being 
discarded. On recovery,
+        *                                       we will only start with a 
single checkpoint.
+        * @param userClassLoader                The user class loader used to 
discard checkpoints
+        * @param client                         The Curator ZooKeeper client
+        * @param checkpointsPath                The ZooKeeper path for the 
checkpoints (needs to
+        *                                       start with a '/')
+        * @param stateHandleProvider            The state handle provider for 
checkpoints
+        * @throws Exception
+        */
+       public ZooKeeperCompletedCheckpointStore(
+                       int maxNumberOfCheckpointsToRetain,
+                       ClassLoader userClassLoader,
+                       CuratorFramework client,
+                       String checkpointsPath,
+                       StateHandleProvider<CompletedCheckpoint> 
stateHandleProvider) throws Exception {
+
+               checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain 
at least one checkpoint.");
+
+               this.maxNumberOfCheckpointsToRetain = 
maxNumberOfCheckpointsToRetain;
+               this.userClassLoader = checkNotNull(userClassLoader, "User 
class loader");
+
+               checkNotNull(client, "Curator client");
+               checkNotNull(checkpointsPath, "Checkpoints path");
+               checkNotNull(stateHandleProvider, "State handle provider");
+
+               // Ensure that the checkpoints path exists
+               client.newNamespaceAwareEnsurePath(checkpointsPath)
+                               .ensure(client.getZookeeperClient());
+
+               // All operations will have the path as root
+               this.client = client.usingNamespace(client.getNamespace() + 
checkpointsPath);
+
+               this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(
+                               this.client, stateHandleProvider);
+
+               this.checkpointStateHandles = new 
ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
+
+               LOG.info("Initialized in '{}'.", checkpointsPath);
+       }
+
+       /**
+        * Gets the latest checkpoint from ZooKeeper and removes all others.
+        *
+        * <p><strong>Important</strong>: Even if there are more than one 
checkpoint in ZooKeeper,
+        * this will only recover the latest and discard the others. Otherwise, 
there is no guarantee
+        * that the history of checkpoints is consistent.
+        */
+       @Override
+       public void recover() throws Exception {
+               LOG.info("Recovering checkpoints from ZooKeeper.");
+
+               // Get all there is first
+               List<Tuple2<StateHandle<CompletedCheckpoint>, String>> 
initialCheckpoints;
+               while (true) {
+                       try {
+                               initialCheckpoints = 
checkpointsInZooKeeper.getAllSortedByName();
+                               break;
+                       }
+                       catch (ConcurrentModificationException e) {
+                               LOG.warn("Concurrent modification while reading 
from ZooKeeper. Retrying.");
+                       }
+               }
+
+               int numberOfInitialCheckpoints = initialCheckpoints.size();
+
+               LOG.info("Found {} checkpoints in ZooKeeper.", 
numberOfInitialCheckpoints);
+
+               if (numberOfInitialCheckpoints > 0) {
+                       // Take the last one. This is the latest checkpoints, 
because path names are strictly
+                       // increasing (checkpoint ID).
+                       Tuple2<StateHandle<CompletedCheckpoint>, String> latest 
= initialCheckpoints
+                                       .get(numberOfInitialCheckpoints - 1);
+
+                       CompletedCheckpoint latestCheckpoint = 
latest.f0.getState(userClassLoader);
+
+                       checkpointStateHandles.add(latest);
+
+                       LOG.info("Initialized with {}. Removing all older 
checkpoints.", latestCheckpoint);
+
+                       for (int i = 0; i < numberOfInitialCheckpoints - 1; 
i++) {
+                               try {
+                                       
removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
+                               }
+                               catch (Exception e) {
+                                       LOG.error("Failed to discard 
checkpoint", e);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Synchronously writes the new checkpoints to ZooKeeper and 
asynchronously removes older ones.
+        *
+        * @param checkpoint Completed checkpoint to add.
+        */
+       @Override
+       public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
+               checkNotNull(checkpoint, "Checkpoint");
+
+               // First add the new one. If it fails, we don't want to loose 
existing data.
+               String path = String.format("/%s", 
checkpoint.getCheckpointID());
+
+               final StateHandle<CompletedCheckpoint> stateHandle = 
checkpointsInZooKeeper.add(path, checkpoint);
+
+               checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path));
+
+               // Everything worked, let's remove a previous checkpoint if 
necessary.
+               if (checkpointStateHandles.size() > 
maxNumberOfCheckpointsToRetain) {
+                       
removeFromZooKeeperAndDiscardCheckpoint(checkpointStateHandles.removeFirst());
+               }
+
+               LOG.debug("Added {} to {}.", checkpoint, path);
+       }
+
+       @Override
+       public CompletedCheckpoint getLatestCheckpoint() throws Exception {
+               if (checkpointStateHandles.isEmpty()) {
+                       return null;
+               }
+               else {
+                       return 
checkpointStateHandles.getLast().f0.getState(userClassLoader);
+               }
+       }
+
+       @Override
+       public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
+               List<CompletedCheckpoint> checkpoints = new 
ArrayList<>(checkpointStateHandles.size());
+
+               for (Tuple2<StateHandle<CompletedCheckpoint>, String> 
stateHandle : checkpointStateHandles) {
+                       
checkpoints.add(stateHandle.f0.getState(userClassLoader));
+               }
+
+               return checkpoints;
+       }
+
+       @Override
+       public int getNumberOfRetainedCheckpoints() {
+               return checkpointStateHandles.size();
+       }
+
+       @Override
+       public void discardAllCheckpoints() throws Exception {
+               for (Tuple2<StateHandle<CompletedCheckpoint>, String> 
checkpoint : checkpointStateHandles) {
+                       try {
+                               
removeFromZooKeeperAndDiscardCheckpoint(checkpoint);
+                       }
+                       catch (Exception e) {
+                               LOG.error("Failed to discard checkpoint.", e);
+                       }
+               }
+
+               checkpointStateHandles.clear();
+
+               String path = "/" + client.getNamespace();
+
+               LOG.info("Removing {} from ZooKeeper", path);
+               
ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
+       }
+
+       /**
+        * Removes the state handle from ZooKeeper, discards the checkpoints, 
and the state handle.
+        */
+       private void removeFromZooKeeperAndDiscardCheckpoint(
+                       final Tuple2<StateHandle<CompletedCheckpoint>, String> 
stateHandleAndPath) throws Exception {
+
+               final BackgroundCallback callback = new BackgroundCallback() {
+                       @Override
+                       public void processResult(CuratorFramework client, 
CuratorEvent event) throws Exception {
+                               try {
+                                       if (event.getType() == 
CuratorEventType.DELETE) {
+                                               if (event.getResultCode() == 0) 
{
+                                                       // The checkpoint
+                                                       CompletedCheckpoint 
checkpoint = stateHandleAndPath
+                                                                       
.f0.getState(userClassLoader);
+
+                                                       
checkpoint.discard(userClassLoader);
+
+                                                       // Discard the state 
handle
+                                                       
stateHandleAndPath.f0.discardState();
+
+                                                       // Discard the 
checkpoint
+                                                       LOG.debug("Discarded " 
+ checkpoint);
+                                               }
+                                               else {
+                                                       throw new 
IllegalStateException("Unexpected result code " +
+                                                                       
event.getResultCode() + " in '" + event + "' callback.");
+                                               }
+                                       }
+                                       else {
+                                               throw new 
IllegalStateException("Unexpected event type " +
+                                                               event.getType() 
+ " in '" + event + "' callback.");
+                                       }
+                               }
+                               catch (Exception e) {
+                                       LOG.error("Failed to discard 
checkpoint.", e);
+                               }
+                       }
+               };
+
+               // Remove state handle from ZooKeeper first. If this fails, we 
can still recover, but if
+               // we remove a state handle and fail to remove it from 
ZooKeeper, we end up in an
+               // inconsistent state.
+               checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ef00484..9430d80 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -30,6 +30,8 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -39,6 +41,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -110,8 +113,6 @@ public class ExecutionGraph implements Serializable {
        /** The log object used for debugging. */
        static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
        
-       private static final int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
-
        // 
--------------------------------------------------------------------------------------------
 
        /** The lock used to secure all access to mutable fields, especially 
the tracking of progress
@@ -347,7 +348,11 @@ public class ExecutionGraph implements Serializable {
                        List<ExecutionJobVertex> verticesToWaitFor,
                        List<ExecutionJobVertex> verticesToCommitTo,
                        ActorSystem actorSystem,
-                       UUID leaderSessionID) {
+                       UUID leaderSessionID,
+                       CheckpointIDCounter checkpointIDCounter,
+                       CompletedCheckpointStore completedCheckpointStore,
+                       RecoveryMode recoveryMode) throws Exception {
+
                // simple sanity checks
                if (interval < 10 || checkpointTimeout < 10) {
                        throw new IllegalArgumentException();
@@ -367,12 +372,14 @@ public class ExecutionGraph implements Serializable {
                snapshotCheckpointsEnabled = true;
                checkpointCoordinator = new CheckpointCoordinator(
                                jobID,
-                               NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN,
                                checkpointTimeout,
                                tasksToTrigger,
                                tasksToWaitFor,
                                tasksToCommitTo,
-                               userClassLoader);
+                               userClassLoader,
+                               checkpointIDCounter,
+                               completedCheckpointStore,
+                               recoveryMode);
                
                // the periodic checkpoint scheduler is activated and 
deactivated as a result of
                // job status changes (running -> on, all other states -> off)
@@ -382,8 +389,14 @@ public class ExecutionGraph implements Serializable {
                                                interval,
                                                leaderSessionID));
        }
-       
-       public void disableSnaphotCheckpointing() {
+
+       /**
+        * Disables checkpointing.
+        *
+        * <p>The shutdown of the checkpoint coordinator might block. Make sure 
that calls to this
+        * method don't block the job manager actor and run asynchronously.
+        */
+       public void disableSnaphotCheckpointing() throws Exception {
                if (state != JobStatus.CREATED) {
                        throw new IllegalStateException("Job must be in CREATED 
state");
                }
@@ -773,6 +786,20 @@ public class ExecutionGraph implements Serializable {
        }
 
        /**
+        * Restores the latest checkpointed state.
+        *
+        * <p>The recovery of checkpoints might block. Make sure that calls to 
this method don't
+        * block the job manager actor and run asynchronously.
+        */
+       public void restoreLatestCheckpointedState() throws Exception {
+               synchronized (progressLock) {
+                       if (checkpointCoordinator != null) {
+                               
checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, 
false);
+                       }
+               }
+       }
+
+       /**
         * This method cleans fields that are irrelevant for the archived 
execution attempt.
         */
        public void prepareForArchiving() {
@@ -886,7 +913,13 @@ public class ExecutionGraph implements Serializable {
                                                                        }
                                                                }, 
executionContext);
                                                        } else {
-                                                               restart();
+                                                               future(new 
Callable<Object>() {
+                                                                       
@Override
+                                                                       public 
Object call() throws Exception {
+                                                                               
restart();
+                                                                               
return null;
+                                                                       }
+                                                               }, 
executionContext);
                                                        }
                                                        break;
                                                }
@@ -906,7 +939,7 @@ public class ExecutionGraph implements Serializable {
                        }
                }
        }
-       
+
        private void postRunCleanup() {
                try {
                        CheckpointCoordinator coord = 
this.checkpointCoordinator;

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 6b36e2d..a64d63c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -538,4 +538,9 @@ public class JobGraph implements Serializable {
                        }
                }
        }
+
+       @Override
+       public String toString() {
+               return "JobGraph(jobId: " + jobID + ")";
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
index 2e75b19..17322d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
@@ -35,6 +35,19 @@ public enum RecoveryMode {
        ZOOKEEPER;
 
        /**
+        * Return the configured {@link RecoveryMode}.
+        *
+        * @param config The config to parse
+        * @return Configured recovery mode or {@link 
ConfigConstants#DEFAULT_RECOVERY_MODE} if not
+        * configured.
+        */
+       public static RecoveryMode fromConfig(Configuration config) {
+               return RecoveryMode.valueOf(config.getString(
+                               ConfigConstants.RECOVERY_MODE,
+                               
ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase());
+       }
+
+       /**
         * Returns true if the defined recovery mode supports high availability.
         *
         * @param configuration Configuration which contains the recovery mode

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
new file mode 100644
index 0000000..db36f92
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import scala.Option;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link 
RecoveryMode#STANDALONE}.
+ *
+ * <p>All operations are NoOps, because {@link JobGraph} instances cannot be 
recovered in this
+ * recovery mode.
+ */
+public class StandaloneSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
+
+       @Override
+       public void start(SubmittedJobGraphListener jobGraphListener) throws 
Exception {
+               // Nothing to do
+       }
+
+       @Override
+       public void stop() {
+               // Nothing to do
+       }
+
+       @Override
+       public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+               // Nothing to do
+       }
+
+       @Override
+       public void removeJobGraph(JobID jobId) throws Exception {
+               // Nothing to do
+       }
+
+       @Override
+       public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws 
Exception {
+               return Option.empty();
+       }
+
+       @Override
+       public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
+               return Collections.emptyList();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
new file mode 100644
index 0000000..48da3b8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A recoverable {@link JobGraph} and {@link JobInfo}.
+ */
+public class SubmittedJobGraph implements Serializable {
+
+       private static final long serialVersionUID = 2836099271734771825L;
+
+       /** The submitted {@link JobGraph} */
+       private final JobGraph jobGraph;
+
+       /** The {@link JobInfo}. */
+       private final JobInfo jobInfo;
+
+       /**
+        * Creates a {@link SubmittedJobGraph}.
+        *
+        * @param jobGraph The submitted {@link JobGraph}
+        * @param jobInfo  The {@link JobInfo}
+        */
+       public SubmittedJobGraph(JobGraph jobGraph, JobInfo jobInfo) {
+               this.jobGraph = checkNotNull(jobGraph, "Job graph");
+               this.jobInfo = checkNotNull(jobInfo, "Job info");
+       }
+
+       /**
+        * Returns the submitted {@link JobGraph}.
+        */
+       public JobGraph getJobGraph() {
+               return jobGraph;
+       }
+
+       /**
+        * Returns the {@link JobID} of the submitted {@link JobGraph}.
+        */
+       public JobID getJobId() {
+               return jobGraph.getJobID();
+       }
+
+       /**
+        * Returns the {@link JobInfo} of the client who submitted the {@link 
JobGraph}.
+        */
+       public JobInfo getJobInfo() throws Exception {
+               return jobInfo;
+       }
+
+       @Override
+       public String toString() {
+               return String.format("SubmittedJobGraph(%s, %s)", jobGraph, 
jobInfo);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
new file mode 100644
index 0000000..bd628cd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import scala.Option;
+
+import java.util.List;
+
+/**
+ * {@link SubmittedJobGraph} instances for recovery.
+ */
+public interface SubmittedJobGraphStore {
+
+       /**
+        * Starts the {@link SubmittedJobGraphStore} service.
+        */
+       void start(SubmittedJobGraphListener jobGraphListener) throws Exception;
+
+       /**
+        * Stops the {@link SubmittedJobGraphStore} service.
+        */
+       void stop() throws Exception;
+
+       /**
+        * Returns a list of all submitted {@link JobGraph} instances.
+        */
+       List<SubmittedJobGraph> recoverJobGraphs() throws Exception;
+
+       /**
+        * Returns the {@link SubmittedJobGraph} with the given {@link JobID}.
+        *
+        * <p>An Exception is thrown, if no job graph with the given ID exists.
+        */
+       Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception;
+
+       /**
+        * Adds the {@link SubmittedJobGraph} instance.
+        *
+        * <p>If a job graph with the same {@link JobID} exists, it is replaced.
+        */
+       void putJobGraph(SubmittedJobGraph jobGraph) throws Exception;
+
+       /**
+        * Removes the {@link SubmittedJobGraph} with the given {@link JobID} 
if it exists.
+        */
+       void removeJobGraph(JobID jobId) throws Exception;
+
+       /**
+        * A listener for {@link SubmittedJobGraph} instances. This is used to 
react to races between
+        * multiple running {@link SubmittedJobGraphStore} instances (on 
multiple job managers).
+        */
+       interface SubmittedJobGraphListener {
+
+               /**
+                * Callback for {@link SubmittedJobGraph} instances added by a 
different {@link
+                * SubmittedJobGraphStore} instance.
+                *
+                * <p><strong>Important:</strong> It is possible to get false 
positives and be notified
+                * about a job graph, which was added by this instance.
+                *
+                * @param jobId The {@link JobID} of the added job graph
+                */
+               void onAddedJobGraph(JobID jobId);
+
+               /**
+                * Callback for {@link SubmittedJobGraph} instances removed by 
a different {@link
+                * SubmittedJobGraphStore} instance.
+                *
+                * @param jobId The {@link JobID} of the removed job graph
+                */
+               void onRemovedJobGraph(JobID jobId);
+
+       }
+
+}

Reply via email to