[FLINK-4322] [checkpointing] Add and fix tests for unified Checkpoint/Savepoint 
Coordinator

This closes #2366


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47acdead
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47acdead
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47acdead

Branch: refs/heads/master
Commit: 47acdeadf3ccd326578306453dd10ed3c147a4e6
Parents: 76ca1a7
Author: Ufuk Celebi <[email protected]>
Authored: Thu Aug 11 19:40:07 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Aug 17 19:18:07 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  138 +--
 .../checkpoint/CheckpointTriggerResult.java     |   17 +-
 .../runtime/checkpoint/PendingSavepoint.java    |   11 +-
 .../StandaloneCheckpointIDCounter.java          |    9 +
 .../StandaloneCompletedCheckpointStore.java     |    2 +-
 .../checkpoint/savepoint/SavepointLoader.java   |    4 +-
 .../checkpoint/savepoint/SavepointV0.java       |    7 +-
 .../runtime/executiongraph/ExecutionGraph.java  |    6 -
 .../flink/runtime/jobmanager/JobManager.scala   |   20 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  598 +++++++---
 .../checkpoint/CheckpointStateRestoreTest.java  |   80 +-
 .../CompletedCheckpointStoreTest.java           |    2 +-
 .../checkpoint/CompletedCheckpointTest.java     |   58 +
 ...ExecutionGraphCheckpointCoordinatorTest.java |   33 +-
 .../checkpoint/PendingCheckpointTest.java       |  134 +++
 .../checkpoint/PendingSavepointTest.java        |  141 +++
 .../savepoint/HeapSavepointStoreTest.java       |   25 -
 .../savepoint/SavepointCoordinatorTest.java     | 1119 ------------------
 .../savepoint/SavepointLoaderTest.java          |  110 ++
 .../stats/SimpleCheckpointStatsTrackerTest.java |    5 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |   32 +-
 .../test/checkpointing/SavepointITCase.java     |  297 +----
 22 files changed, 1033 insertions(+), 1815 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6d44e61..2a1ece0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.dispatch.Futures;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
@@ -33,7 +34,6 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
@@ -45,9 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import scala.concurrent.Future;
-import scala.concurrent.Promise;
 
-import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -69,10 +67,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * It triggers the checkpoint by sending the messages to the relevant tasks 
and collects the
  * checkpoint acknowledgements. It also collects and maintains the overview of 
the state handles
  * reported by the tasks that acknowledge the checkpoint.
- *
- * <p>Depending on the configured {@link RecoveryMode}, the behaviour of the 
{@link
- * CompletedCheckpointStore} and {@link CheckpointIDCounter} change. The 
default standalone
- * implementations don't support any recovery.
  */
 public class CheckpointCoordinator {
 
@@ -156,9 +150,6 @@ public class CheckpointCoordinator {
        /** Flag marking the coordinator as shut down (not accepting any 
messages any more) */
        private volatile boolean shutdown;
 
-       /** Shutdown hook thread to clean up state handles. */
-       private final Thread shutdownHook;
-
        /** Helper for tracking checkpoint statistics  */
        private final CheckpointStatsTracker statsTracker;
 
@@ -180,7 +171,6 @@ public class CheckpointCoordinator {
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore completedCheckpointStore,
                        SavepointStore savepointStore,
-                       RecoveryMode recoveryMode,
                        CheckpointStatsTracker statsTracker) throws Exception {
 
                // sanity checks
@@ -188,7 +178,6 @@ public class CheckpointCoordinator {
                checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must 
be larger than zero");
                checkArgument(minPauseBetweenCheckpoints >= 0, 
"minPauseBetweenCheckpoints must be >= 0");
                checkArgument(maxConcurrentCheckpointAttempts >= 1, 
"maxConcurrentCheckpointAttempts must be >= 1");
-               checkArgument(numberKeyGroups >= 1, "numberKeyGroups must be >= 
1");
 
                this.job = checkNotNull(job);
                this.baseInterval = baseInterval;
@@ -198,52 +187,21 @@ public class CheckpointCoordinator {
                this.tasksToTrigger = checkNotNull(tasksToTrigger);
                this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
                this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
-               this.pendingCheckpoints = new LinkedHashMap<Long, 
PendingCheckpoint>();
+               this.pendingCheckpoints = new LinkedHashMap<>();
                this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
                this.completedCheckpointStore = 
checkNotNull(completedCheckpointStore);
                this.savepointStore = checkNotNull(savepointStore);
-               this.recentPendingCheckpoints = new 
ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
+               this.recentPendingCheckpoints = new 
ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
                this.userClassLoader = checkNotNull(userClassLoader);
                this.statsTracker = checkNotNull(statsTracker);
                this.numberKeyGroups = numberKeyGroups;
 
                this.timer = new Timer("Checkpoint Timer", true);
 
-               if (recoveryMode == RecoveryMode.STANDALONE) {
-                       // Add shutdown hook to clean up state handles when no 
checkpoint recovery is
-                       // possible. In case of another configured recovery 
mode, the checkpoints need to be
-                       // available for the standby job managers.
-                       this.shutdownHook = new Thread(new Runnable() {
-                               @Override
-                               public void run() {
-                                       try {
-                                               
CheckpointCoordinator.this.shutdown();
-                                       }
-                                       catch (Throwable t) {
-                                               LOG.error("Error during 
shutdown of checkpoint coordinator via " +
-                                                               "JVM shutdown 
hook: " + t.getMessage(), t);
-                                       }
-                               }
-                       });
-
-                       try {
-                               // Add JVM shutdown hook to call shutdown of 
service
-                               
Runtime.getRuntime().addShutdownHook(shutdownHook);
-                       }
-                       catch (IllegalStateException ignored) {
-                               // JVM is already shutting down. No need to do 
anything.
-                       }
-                       catch (Throwable t) {
-                               LOG.error("Cannot register checkpoint 
coordinator shutdown hook.", t);
-                       }
-               }
-               else {
-                       this.shutdownHook = null;
-               }
-
-               // make sure the checkpoint ID enumerator is running
                try {
-                       checkpointIdCounter.start();
+                       // Make sure the checkpoint ID enumerator is running. 
Possibly
+                       // issues a blocking call to ZooKeeper.
+                       checkpointIDCounter.start();
                } catch (Exception e) {
                        throw new Exception("Failed to start checkpoint ID 
counter: " + e.getMessage(), e);
                }
@@ -285,50 +243,34 @@ public class CheckpointCoordinator {
         */
        private void shutdown(boolean shutdownStoreAndCounter) throws Exception 
{
                synchronized (lock) {
-                       try {
-                               if (!shutdown) {
-                                       shutdown = true;
-                                       LOG.info("Stopping checkpoint 
coordinator for job " + job);
+                       if (!shutdown) {
+                               shutdown = true;
+                               LOG.info("Stopping checkpoint coordinator for 
job " + job);
 
-                                       periodicScheduling = false;
-                                       triggerRequestQueued = false;
+                               periodicScheduling = false;
+                               triggerRequestQueued = false;
 
-                                       // shut down the thread that handles 
the timeouts and pending triggers
-                                       timer.cancel();
+                               // shut down the thread that handles the 
timeouts and pending triggers
+                               timer.cancel();
 
-                                       // make sure that the actor does not 
linger
-                                       if (jobStatusListener != null) {
-                                               
jobStatusListener.tell(PoisonPill.getInstance());
-                                               jobStatusListener = null;
-                                       }
+                               // make sure that the actor does not linger
+                               if (jobStatusListener != null) {
+                                       
jobStatusListener.tell(PoisonPill.getInstance());
+                                       jobStatusListener = null;
+                               }
 
-                                       // clear and discard all pending 
checkpoints
-                                       for (PendingCheckpoint pending : 
pendingCheckpoints.values()) {
-                                               pending.abortError(new 
Exception("Checkpoint Coordinator is shutting down"));
-                                       }
-                                       pendingCheckpoints.clear();
-
-                                       if (shutdownStoreAndCounter) {
-                                               
completedCheckpointStore.shutdown();
-                                               checkpointIdCounter.shutdown();
-                                       } else {
-                                               
completedCheckpointStore.suspend();
-                                               checkpointIdCounter.suspend();
-                                       }
+                               // clear and discard all pending checkpoints
+                               for (PendingCheckpoint pending : 
pendingCheckpoints.values()) {
+                                       pending.abortError(new 
Exception("Checkpoint Coordinator is shutting down"));
                                }
-                       } finally {
-                               // Remove shutdown hook to prevent resource 
leaks, unless this is invoked by the
-                               // shutdown hook itself.
-                               if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
-                                       try {
-                                               
Runtime.getRuntime().removeShutdownHook(shutdownHook);
-                                       }
-                                       catch (IllegalStateException ignored) {
-                                               // race, JVM is in shutdown 
already, we can safely ignore this
-                                       }
-                                       catch (Throwable t) {
-                                               LOG.warn("Error unregistering 
checkpoint coordinator shutdown hook.", t);
-                                       }
+                               pendingCheckpoints.clear();
+
+                               if (shutdownStoreAndCounter) {
+                                       completedCheckpointStore.shutdown();
+                                       checkpointIdCounter.shutdown();
+                               } else {
+                                       completedCheckpointStore.suspend();
+                                       checkpointIdCounter.suspend();
                                }
                        }
                }
@@ -350,10 +292,7 @@ public class CheckpointCoordinator {
                        return savepoint.getCompletionFuture();
                }
                else {
-                       final Promise<String> promise = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
-                       promise.failure(
-                                       new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message()));
-                       return promise.future(); 
+                       return Futures.failed(new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message()));
                }
        }
 
@@ -586,9 +525,8 @@ public class CheckpointCoordinator {
                                rememberRecentCheckpointId(checkpointId);
 
                                boolean haveMoreRecentPending = false;
-                               Iterator<Map.Entry<Long, PendingCheckpoint>> 
entries = pendingCheckpoints.entrySet().iterator();
-                               while (entries.hasNext()) {
-                                       PendingCheckpoint p = 
entries.next().getValue();
+
+                               for (PendingCheckpoint p : 
pendingCheckpoints.values()) {
                                        if (!p.isDiscarded() && 
p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
                                                haveMoreRecentPending = true;
                                                break;
@@ -746,7 +684,7 @@ public class CheckpointCoordinator {
 
                while (entries.hasNext()) {
                        PendingCheckpoint p = entries.next().getValue();
-                       if (p.getCheckpointTimestamp() < timestamp && 
p.canBeSubsumed()) {
+                       if (p.getCheckpointTimestamp() <= timestamp && 
p.canBeSubsumed()) {
                                rememberRecentCheckpointId(p.getCheckpointId());
                                p.abortSubsumed();
                                entries.remove();
@@ -895,7 +833,7 @@ public class CheckpointCoordinator {
 
        public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
                synchronized (lock) {
-                       return new HashMap<Long, 
PendingCheckpoint>(this.pendingCheckpoints);
+                       return new HashMap<>(this.pendingCheckpoints);
                }
        }
 
@@ -913,14 +851,6 @@ public class CheckpointCoordinator {
                return checkpointIdCounter;
        }
 
-       protected ActorGateway getJobStatusListener() {
-               return jobStatusListener;
-       }
-
-       protected void setJobStatusListener(ActorGateway jobStatusListener) {
-               this.jobStatusListener = jobStatusListener;
-       }
-
        // 
--------------------------------------------------------------------------------------------
        //  Periodic scheduling of checkpoints
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
index 3f91407..65dc73f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.checkpoint;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The result of triggering a checkpoint. May be a declined checkpoint trigger 
attempt,
- * or a pending checkpoint.
+ * The result of triggering a checkpoint. May either be a declined checkpoint
+ * trigger attempt, or a pending checkpoint.
  */
 class CheckpointTriggerResult {
 
@@ -68,7 +68,7 @@ class CheckpointTriggerResult {
                if (success != null) {
                        return success;
                } else {
-                       throw new IllegalStateException();
+                       throw new IllegalStateException("Checkpoint triggering 
failed");
                }
        }
 
@@ -76,7 +76,7 @@ class CheckpointTriggerResult {
                if (failure != null) {
                        return failure;
                } else {
-                       throw new IllegalStateException();
+                       throw new IllegalStateException("Checkpoint triggering 
was successful");
                }
        }
 
@@ -84,8 +84,9 @@ class CheckpointTriggerResult {
 
        @Override
        public String toString() {
-               return isSuccess() ? 
-                               ("success: " + success) :
-                               ("failure: " + failure.message()); 
+               return "CheckpointTriggerResult(" +
+                               (isSuccess() ?
+                                               ("success: " + success) :
+                                               ("failure: " + 
failure.message())) + ")";
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
index 92cdd04..460ff8e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
@@ -25,9 +25,7 @@ import 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV0;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.util.ExceptionUtils;
-
 import org.slf4j.Logger;
-
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 
@@ -107,7 +105,14 @@ public class PendingSavepoint extends PendingCheckpoint {
 
        @Override
        public void abortSubsumed() throws Exception {
-               throw new Exception("Bug: Savepoints must never be subsumed");
+               try {
+                       Exception e = new Exception("Bug: Savepoints must never 
be subsumed");
+                       onCompletionPromise.failure(e);
+                       throw e;
+               }
+               finally {
+                       dispose(true);
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index c0ea93d..0a235bc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -50,4 +50,13 @@ public class StandaloneCheckpointIDCounter implements 
CheckpointIDCounter {
        public void setCount(long newCount) {
                checkpointIdCounter.set(newCount);
        }
+
+       /**
+        * Returns the last checkpoint ID (current - 10.
+        *
+        * @return Last checkpoint ID.
+        */
+       public long getLast() {
+               return checkpointIdCounter.get() - 1;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 0f6cf33..bc111cd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -30,7 +30,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * {@link CompletedCheckpointStore} for JobManagers running in {@link 
RecoveryMode#STANDALONE}.
  */
-class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
+public class StandaloneCompletedCheckpointStore implements 
CompletedCheckpointStore {
 
        /** The maximum number of checkpoints to retain (at least 1). */
        private final int maxNumberOfCheckpointsToRetain;

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 0b7b0c2..1be7a58 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -27,8 +27,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * The SavepointLoader is a utility to load and verify a Savepoint, and to 
create a checkpoint from it. 
  */
@@ -54,7 +52,7 @@ public class SavepointLoader {
                        String savepointPath) throws Exception {
 
                // (1) load the savepoint
-               Savepoint savepoint = 
savepointStore.loadSavepoint(checkNotNull(savepointPath));
+               Savepoint savepoint = 
savepointStore.loadSavepoint(savepointPath);
                final Map<JobVertexID, TaskState> taskStates = new 
HashMap<>(savepoint.getTaskStates().size());
                
                // (2) validate it (parallelism, etc)

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
index 9fd950d..d60d80e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint.savepoint;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
 import java.util.Collection;
 
 /**
@@ -37,11 +38,11 @@ public class SavepointV0 implements Savepoint {
        private final long checkpointId;
 
        /** The task states */
-       private final Collection<TaskState> taskStates;
+       private final Collection<TaskState> taskStates = new ArrayList();
 
-       SavepointV0(long checkpointId, Collection<TaskState> taskStates) {
+       public SavepointV0(long checkpointId, Collection<TaskState> taskStates) 
{
                this.checkpointId = checkpointId;
-               this.taskStates = Preconditions.checkNotNull(taskStates, "Task 
States");
+               this.taskStates.addAll(taskStates);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 82826dd..12d8e66 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorSystem;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -49,7 +48,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
@@ -59,10 +57,8 @@ import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -353,7 +349,6 @@ public class ExecutionGraph {
                        UUID leaderSessionID,
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore checkpointStore,
-                       RecoveryMode recoveryMode,
                        SavepointStore savepointStore,
                        CheckpointStatsTracker statsTracker) throws Exception {
 
@@ -389,7 +384,6 @@ public class ExecutionGraph {
                                checkpointIDCounter,
                                checkpointStore,
                                savepointStore,
-                               recoveryMode,
                                checkpointStatsTracker);
 
                // the periodic checkpoint scheduler is activated and 
deactivated as a result of

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9fb01bf..a82e89a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1093,10 +1093,10 @@ class JobManager(
           Option(jobGraph.getSerializedExecutionConfig()
             .deserializeValue(userCodeLoader)
             .getRestartStrategy())
-              .map(RestartStrategyFactory.createRestartStrategy(_)) match {
-                case Some(strategy) => strategy
-                case None => restartStrategyFactory.createRestartStrategy()
-              }
+            .map(RestartStrategyFactory.createRestartStrategy(_)) match {
+            case Some(strategy) => strategy
+            case None => restartStrategyFactory.createRestartStrategy()
+          }
 
         log.info(s"Using restart strategy $restartStrategy for $jobId.")
 
@@ -1253,7 +1253,6 @@ class JobManager(
             leaderSessionID.orNull,
             checkpointIdCounter,
             completedCheckpoints,
-            recoveryMode,
             savepointStore,
             checkpointStatsTracker)
         }
@@ -1294,7 +1293,6 @@ class JobManager(
       // because it is a blocking operation
       future {
         try {
-
           if (isRecovery) {
             // this is a recovery of a master failure (this master takes over)
             executionGraph.restoreLatestCheckpointedState()
@@ -1305,7 +1303,7 @@ class JobManager(
             val snapshotSettings = jobGraph.getSnapshotSettings
             if (snapshotSettings != null) {
               val savepointPath = snapshotSettings.getSavepointPath()
-              
+
               if (savepointPath != null) {
                 // got a savepoint
                 try {
@@ -1316,14 +1314,14 @@ class JobManager(
                     jobId, executionGraph.getAllVertices, savepointStore, 
savepointPath)
 
                   executionGraph.getCheckpointCoordinator.getCheckpointStore
-                      .addCheckpoint(savepoint)
-                  
+                    .addCheckpoint(savepoint)
+
                   // Reset the checkpoint ID counter
                   val nextCheckpointId: Long = savepoint.getCheckpointID + 1
                   log.info(s"Reset the checkpoint ID to $nextCheckpointId")
                   
executionGraph.getCheckpointCoordinator.getCheckpointIdCounter
-                      .setCount(nextCheckpointId)
-                  
+                    .setCount(nextCheckpointId)
+
                   executionGraph.restoreLatestCheckpointedState()
                 } catch {
                   case e: Exception =>

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 62af42b..3341095 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -19,13 +19,13 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
 import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import scala.concurrent.Future;
 
 import java.io.Serializable;
 import java.util.Iterator;
@@ -80,17 +81,19 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               42,
-                               new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
-                               new ExecutionVertex[] { ackVertex1, ackVertex2 
},
-                               new ExecutionVertex[] {},
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1, cl),
-                               RecoveryMode.STANDALONE);
+                                       jid,
+                                       600000,
+                                       600000,
+                                       0, Integer.MAX_VALUE,
+                                       42,
+                                       new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
+                                       new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
+                                       new ExecutionVertex[] {},
+                                       cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(1, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -131,17 +134,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               42,
-                               new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
-                               new ExecutionVertex[] { ackVertex1, ackVertex2 
},
-                               new ExecutionVertex[] {},
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1, cl),
-                               RecoveryMode.STANDALONE);
+                                       jid,
+                                       600000,
+                                       600000,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       42,
+                                       new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
+                                       new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
+                                       new ExecutionVertex[] {},
+                                       cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(1, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -180,17 +186,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               42,
-                               new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
-                               new ExecutionVertex[] { ackVertex1, ackVertex2 
},
-                               new ExecutionVertex[] {},
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1, cl),
-                               RecoveryMode.STANDALONE);
+                                       jid,
+                                       600000,
+                                       600000,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       42,
+                                       new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
+                                       new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
+                                       new ExecutionVertex[] {},
+                                       cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(1, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -230,17 +239,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               42,
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1, cl),
-                               RecoveryMode.STANDALONE);
+                                       jid,
+                                       600000,
+                                       600000,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       42,
+                                       new ExecutionVertex[] { vertex1, 
vertex2 },
+                                       new ExecutionVertex[] { vertex1, 
vertex2 },
+                                       new ExecutionVertex[] { vertex1, 
vertex2 },
+                                       cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(1, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -353,17 +365,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               42,
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1, cl),
-                               RecoveryMode.STANDALONE);
+                                       jid,
+                                       600000,
+                                       600000,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       42,
+                                       new ExecutionVertex[] { vertex1, 
vertex2 },
+                                       new ExecutionVertex[] { vertex1, 
vertex2 },
+                                       new ExecutionVertex[] { vertex1, 
vertex2 },
+                                       cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(1, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -472,17 +487,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               42,
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1, cl),
-                               RecoveryMode.STANDALONE);
+                                       jid,
+                                       600000,
+                                       600000,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       42,
+                                       new ExecutionVertex[] { vertex1, 
vertex2 },
+                                       new ExecutionVertex[] { vertex1, 
vertex2 },
+                                       new ExecutionVertex[] { vertex1, 
vertex2 },
+                                       cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(1, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -621,17 +639,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               42,
-                               new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
-                               new ExecutionVertex[] { ackVertex1, ackVertex2, 
ackVertex3 },
-                               new ExecutionVertex[] { commitVertex },
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(2, cl),
-                               RecoveryMode.STANDALONE);
+                                       jid,
+                                       600000,
+                                       600000,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       42,
+                                       new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
+                                       new ExecutionVertex[] { ackVertex1, 
ackVertex2, ackVertex3 },
+                                       new ExecutionVertex[] { commitVertex },
+                                       cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(2, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -755,17 +776,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               42,
-                               new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
-                               new ExecutionVertex[] { ackVertex1, ackVertex2, 
ackVertex3 },
-                               new ExecutionVertex[] { commitVertex },
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(10, cl),
-                               RecoveryMode.STANDALONE);
+                                       jid,
+                                       600000,
+                                       600000,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       42,
+                                       new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
+                                       new ExecutionVertex[] { ackVertex1, 
ackVertex2, ackVertex3 },
+                                       new ExecutionVertex[] { commitVertex },
+                                       cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(10, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -875,17 +899,20 @@ public class CheckpointCoordinatorTest {
                        // the timeout for the checkpoint is a 200 milliseconds
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               200,
-                               42,
-                               new ExecutionVertex[] { triggerVertex },
-                               new ExecutionVertex[] { ackVertex1, ackVertex2 
},
-                               new ExecutionVertex[] { commitVertex },
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(2, cl),
-                               RecoveryMode.STANDALONE);
+                                       jid,
+                                       600000,
+                                       200,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       42,
+                                       new ExecutionVertex[] { triggerVertex },
+                                       new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
+                                       new ExecutionVertex[] { commitVertex },
+                                       cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(2, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        // trigger a checkpoint, partially acknowledged
                        assertTrue(coord.triggerCheckpoint(timestamp));
@@ -942,17 +969,20 @@ public class CheckpointCoordinatorTest {
                        ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               200000,
-                               200000,
-                               42,
-                               new ExecutionVertex[] { triggerVertex },
-                               new ExecutionVertex[] { ackVertex1, ackVertex2 
},
-                               new ExecutionVertex[] { commitVertex },
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(2, cl),
-                               RecoveryMode.STANDALONE);
+                                       jid,
+                                       200000,
+                                       200000,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       42,
+                                       new ExecutionVertex[] { triggerVertex },
+                                       new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
+                                       new ExecutionVertex[] { commitVertex },
+                                       cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(2, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        assertTrue(coord.triggerCheckpoint(timestamp));
 
@@ -1020,17 +1050,20 @@ public class CheckpointCoordinatorTest {
                        
}).when(triggerVertex).sendMessageToCurrentExecution(any(Serializable.class), 
any(ExecutionAttemptID.class));
                        
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               10,             // periodic interval is 10 ms
-                               200000, // timeout is very long (200 s)
-                               42,
-                               new ExecutionVertex[] { triggerVertex },
-                               new ExecutionVertex[] { ackVertex },
-                               new ExecutionVertex[] { commitVertex },
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(2, cl),
-                               RecoveryMode.STANDALONE);
+                                       jid,
+                                       10,        // periodic interval is 10 ms
+                                       200000,    // timeout is very long (200 
s)
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       42,
+                                       new ExecutionVertex[] { triggerVertex },
+                                       new ExecutionVertex[] { ackVertex },
+                                       new ExecutionVertex[] { commitVertex },
+                                       cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(2, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        
                        coord.startCheckpointScheduler();
@@ -1110,20 +1143,20 @@ public class CheckpointCoordinatorTest {
                        
}).when(vertex1).sendMessageToCurrentExecution(any(Serializable.class), 
any(ExecutionAttemptID.class));
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               10,             // periodic interval is 10 ms
-                               200000, // timeout is very long (200 s)
-                               500,    // 500ms delay between checkpoints
-                               10,
-                               42,
-                               new ExecutionVertex[] { vertex1 },
-                               new ExecutionVertex[] { vertex1 },
-                               new ExecutionVertex[] { vertex1 },
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(2, cl),
-                               RecoveryMode.STANDALONE,
-                               new DisabledCheckpointStatsTracker());
+                                       jid,
+                                       10,        // periodic interval is 10 ms
+                                       200000,    // timeout is very long (200 
s)
+                                       500,    // 500ms delay between 
checkpoints
+                                       10,
+                                       42,
+                                       new ExecutionVertex[] { vertex1 },
+                                       new ExecutionVertex[] { vertex1 },
+                                       new ExecutionVertex[] { vertex1 },
+                                       cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(2, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        coord.startCheckpointScheduler();
 
@@ -1170,20 +1203,236 @@ public class CheckpointCoordinatorTest {
 
        @Test
        public void testMaxConcurrentAttempts1() {
-               testMaxConcurrentAttemps(1);
+               testMaxConcurrentAttempts(1);
        }
 
        @Test
        public void testMaxConcurrentAttempts2() {
-               testMaxConcurrentAttemps(2);
+               testMaxConcurrentAttempts(2);
        }
 
        @Test
        public void testMaxConcurrentAttempts5() {
-               testMaxConcurrentAttemps(5);
+               testMaxConcurrentAttempts(5);
        }
        
-       private void testMaxConcurrentAttemps(int maxConcurrentAttempts) {
+       @Test
+       public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
+               final JobID jid = new JobID();
+               final long timestamp = System.currentTimeMillis();
+
+               // create some mock Execution vertices that receive the 
checkpoint trigger messages
+               final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+               final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+               ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+               ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+               // set up the coordinator and validate the initial state
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               42,
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               cl,
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1, cl),
+                               new HeapSavepointStore(),
+                               new DisabledCheckpointStatsTracker());
+
+               assertEquals(0, coord.getNumberOfPendingCheckpoints());
+               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+               // trigger the first checkpoint. this should succeed
+               Future<String> savepointFuture = 
coord.triggerSavepoint(timestamp);
+               assertFalse(savepointFuture.isCompleted());
+
+               // validate that we have a pending savepoint
+               assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+               long checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+               PendingCheckpoint pending = 
coord.getPendingCheckpoints().get(checkpointId);
+
+               assertNotNull(pending);
+               assertEquals(checkpointId, pending.getCheckpointId());
+               assertEquals(timestamp, pending.getCheckpointTimestamp());
+               assertEquals(jid, pending.getJobId());
+               assertEquals(2, pending.getNumberOfNonAcknowledgedTasks());
+               assertEquals(0, pending.getNumberOfAcknowledgedTasks());
+               assertEquals(0, pending.getTaskStates().size());
+               assertFalse(pending.isDiscarded());
+               assertFalse(pending.isFullyAcknowledged());
+               assertFalse(pending.canBeSubsumed());
+               assertTrue(pending instanceof PendingSavepoint);
+
+
+               // acknowledge from one of the tasks
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointId));
+               assertEquals(1, pending.getNumberOfAcknowledgedTasks());
+               assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
+               assertFalse(pending.isDiscarded());
+               assertFalse(pending.isFullyAcknowledged());
+               assertFalse(savepointFuture.isCompleted());
+
+               // acknowledge the same task again (should not matter)
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointId));
+               assertFalse(pending.isDiscarded());
+               assertFalse(pending.isFullyAcknowledged());
+               assertFalse(savepointFuture.isCompleted());
+
+               // acknowledge the other task.
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointId));
+
+               // the checkpoint is internally converted to a successful 
checkpoint and the
+               // pending checkpoint object is disposed
+               assertTrue(pending.isDiscarded());
+               assertTrue(savepointFuture.isCompleted());
+
+               // the now we should have a completed checkpoint
+               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertEquals(0, coord.getNumberOfPendingCheckpoints());
+
+               // validate that the relevant tasks got a confirmation message
+               {
+                       NotifyCheckpointComplete confirmMessage1 = new 
NotifyCheckpointComplete(jid, attemptID1, checkpointId, timestamp);
+                       NotifyCheckpointComplete confirmMessage2 = new 
NotifyCheckpointComplete(jid, attemptID2, checkpointId, timestamp);
+                       verify(vertex1, 
times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
+                       verify(vertex2, 
times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
+               }
+
+               CompletedCheckpoint success = 
coord.getSuccessfulCheckpoints().get(0);
+               assertEquals(jid, success.getJobId());
+               assertEquals(timestamp, success.getTimestamp());
+               assertEquals(pending.getCheckpointId(), 
success.getCheckpointID());
+               assertTrue(success.getTaskStates().isEmpty());
+
+               // ---------------
+               // trigger another checkpoint and see that this one replaces 
the other checkpoint
+               // ---------------
+               final long timestampNew = timestamp + 7;
+               savepointFuture = coord.triggerSavepoint(timestampNew);
+               assertFalse(savepointFuture.isCompleted());
+
+               long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointIdNew));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointIdNew));
+
+               assertEquals(0, coord.getNumberOfPendingCheckpoints());
+               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+               CompletedCheckpoint successNew = 
coord.getSuccessfulCheckpoints().get(0);
+               assertEquals(jid, successNew.getJobId());
+               assertEquals(timestampNew, successNew.getTimestamp());
+               assertEquals(checkpointIdNew, successNew.getCheckpointID());
+               assertTrue(successNew.getTaskStates().isEmpty());
+               assertTrue(savepointFuture.isCompleted());
+
+               // validate that the relevant tasks got a confirmation message
+               {
+                       TriggerCheckpoint expectedMessage1 = new 
TriggerCheckpoint(jid, attemptID1, checkpointIdNew, timestampNew);
+                       TriggerCheckpoint expectedMessage2 = new 
TriggerCheckpoint(jid, attemptID2, checkpointIdNew, timestampNew);
+                       verify(vertex1, 
times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
+                       verify(vertex2, 
times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
+
+                       NotifyCheckpointComplete confirmMessage1 = new 
NotifyCheckpointComplete(jid, attemptID1, checkpointIdNew, timestampNew);
+                       NotifyCheckpointComplete confirmMessage2 = new 
NotifyCheckpointComplete(jid, attemptID2, checkpointIdNew, timestampNew);
+                       verify(vertex1, 
times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
+                       verify(vertex2, 
times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
+               }
+
+               coord.shutdown();
+       }
+
+       /**
+        * Triggers a savepoint and two checkpoints. The second checkpoint 
completes
+        * and subsumes the first checkpoint, but not the first savepoint. Then 
we
+        * trigger another checkpoint and savepoint. The 2nd savepoint 
completes and
+        * subsumes the last checkpoint, but not the first savepoint.
+        */
+       @Test
+       public void testSavepointsAreNotSubsumed() throws Exception {
+               final JobID jid = new JobID();
+               final long timestamp = System.currentTimeMillis();
+
+               // create some mock Execution vertices that receive the 
checkpoint trigger messages
+               final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+               final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+               ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+               ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+               StandaloneCheckpointIDCounter counter = new 
StandaloneCheckpointIDCounter();
+
+               // set up the coordinator and validate the initial state
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               42,
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               cl,
+                               counter,
+                               new StandaloneCompletedCheckpointStore(10, cl),
+                               new HeapSavepointStore(),
+                               new DisabledCheckpointStatsTracker());
+
+               // Trigger savepoint and checkpoint
+               Future<String> savepointFuture1 = 
coord.triggerSavepoint(timestamp);
+               long savepointId1 = counter.getLast();
+               assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+               assertTrue(coord.triggerCheckpoint(timestamp + 1));
+               assertEquals(2, coord.getNumberOfPendingCheckpoints());
+
+               assertTrue(coord.triggerCheckpoint(timestamp + 2));
+               long checkpointId2 = counter.getLast();
+               assertEquals(3, coord.getNumberOfPendingCheckpoints());
+
+               // 2nd checkpoint should subsume the 1st checkpoint, but not 
the savepoint
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointId2));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointId2));
+
+               assertEquals(1, coord.getNumberOfPendingCheckpoints());
+               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+               
assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
+               assertFalse(savepointFuture1.isCompleted());
+
+               assertTrue(coord.triggerCheckpoint(timestamp + 3));
+               assertEquals(2, coord.getNumberOfPendingCheckpoints());
+
+               Future<String> savepointFuture2 = 
coord.triggerSavepoint(timestamp);
+               long savepointId2 = counter.getLast();
+               assertEquals(3, coord.getNumberOfPendingCheckpoints());
+
+               // 2nd savepoint should subsume the last checkpoint, but not 
the 1st savepoint
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, savepointId2));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, savepointId2));
+
+               assertEquals(1, coord.getNumberOfPendingCheckpoints());
+               assertEquals(2, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               
assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
+
+               assertFalse(savepointFuture1.isCompleted());
+               assertTrue(savepointFuture2.isCompleted());
+
+               // Ack first savepoint
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, savepointId1));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, savepointId1));
+
+               assertEquals(0, coord.getNumberOfPendingCheckpoints());
+               assertEquals(3, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertTrue(savepointFuture1.isCompleted());
+       }
+
+       private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
                try {
                        final JobID jid = new JobID();
 
@@ -1207,17 +1456,18 @@ public class CheckpointCoordinatorTest {
                        
}).when(triggerVertex).sendMessageToCurrentExecution(any(Serializable.class), 
any(ExecutionAttemptID.class));
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               10,             // periodic interval is 10 ms
-                               200000, // timeout is very long (200 s)
-                               0L,             // no extra delay
-                               maxConcurrentAttempts,
-                               42,
-                               new ExecutionVertex[] { triggerVertex },
-                               new ExecutionVertex[] { ackVertex },
-                               new ExecutionVertex[] { commitVertex }, cl, new 
StandaloneCheckpointIDCounter
-                               (), new StandaloneCompletedCheckpointStore(2, 
cl), RecoveryMode.STANDALONE,
-                               new DisabledCheckpointStatsTracker());
+                                       jid,
+                                       10,        // periodic interval is 10 ms
+                                       200000,    // timeout is very long (200 
s)
+                                       0L,        // no extra delay
+                                       maxConcurrentAttempts,
+                                       42,
+                                       new ExecutionVertex[] { triggerVertex },
+                                       new ExecutionVertex[] { ackVertex },
+                                       new ExecutionVertex[] { commitVertex }, 
cl, new StandaloneCheckpointIDCounter
+                                       (), new 
StandaloneCompletedCheckpointStore(2, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        coord.startCheckpointScheduler();
 
@@ -1278,17 +1528,18 @@ public class CheckpointCoordinatorTest {
                        ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               10,             // periodic interval is 10 ms
-                               200000, // timeout is very long (200 s)
-                               0L,             // no extra delay
-                               maxConcurrentAttempts, // max two concurrent 
checkpoints
-                               42,
-                               new ExecutionVertex[] { triggerVertex },
-                               new ExecutionVertex[] { ackVertex },
-                               new ExecutionVertex[] { commitVertex }, cl, new 
StandaloneCheckpointIDCounter
-                               (), new StandaloneCompletedCheckpointStore(2, 
cl), RecoveryMode.STANDALONE,
-                               new DisabledCheckpointStatsTracker());
+                                       jid,
+                                       10,        // periodic interval is 10 ms
+                                       200000,    // timeout is very long (200 
s)
+                                       0L,        // no extra delay
+                                       maxConcurrentAttempts, // max two 
concurrent checkpoints
+                                       42,
+                                       new ExecutionVertex[] { triggerVertex },
+                                       new ExecutionVertex[] { ackVertex },
+                                       new ExecutionVertex[] { commitVertex }, 
cl, new StandaloneCheckpointIDCounter
+                                       (), new 
StandaloneCompletedCheckpointStore(2, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        coord.startCheckpointScheduler();
 
@@ -1358,17 +1609,18 @@ public class CheckpointCoordinatorTest {
                                        });
                        
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               10,             // periodic interval is 10 ms
-                               200000, // timeout is very long (200 s)
-                               0L,             // no extra delay
-                               2, // max two concurrent checkpoints
-                               42,
-                               new ExecutionVertex[] { triggerVertex },
-                               new ExecutionVertex[] { ackVertex },
-                               new ExecutionVertex[] { commitVertex }, cl, new 
StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(2, cl), 
RecoveryMode.STANDALONE,
-                               new DisabledCheckpointStatsTracker());
+                                       jid,
+                                       10,        // periodic interval is 10 ms
+                                       200000,    // timeout is very long (200 
s)
+                                       0L,        // no extra delay
+                                       2, // max two concurrent checkpoints
+                                       42,
+                                       new ExecutionVertex[] { triggerVertex },
+                                       new ExecutionVertex[] { ackVertex },
+                                       new ExecutionVertex[] { commitVertex }, 
cl, new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(2, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
                        
                        coord.startCheckpointScheduler();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 2b1b7e1..061059a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -19,13 +19,14 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
+import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
@@ -39,7 +40,10 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests concerning the restoring of state from a checkpoint to the task 
executions.
@@ -81,17 +85,20 @@ public class CheckpointStateRestoreTest {
 
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               200000L,
-                               200000L,
-                               42,
-                               new ExecutionVertex[] { stateful1, stateful2, 
stateful3, stateless1, stateless2 },
-                               new ExecutionVertex[] { stateful1, stateful2, 
stateful3, stateless1, stateless2 },
-                               new ExecutionVertex[0],
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1, cl),
-                               RecoveryMode.STANDALONE);
+                                       jid,
+                                       200000L,
+                                       200000L,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       42,
+                                       new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
+                                       new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
+                                       new ExecutionVertex[0],
+                                       cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(1, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        // create ourselves a checkpoint with state
                        final long timestamp = 34623786L;
@@ -158,17 +165,20 @@ public class CheckpointStateRestoreTest {
 
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               200000L,
-                               200000L,
-                               42,
-                               new ExecutionVertex[] { stateful1, stateful2, 
stateful3, stateless1, stateless2 },
-                               new ExecutionVertex[] { stateful1, stateful2, 
stateful3, stateless1, stateless2 },
-                               new ExecutionVertex[0],
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1, cl),
-                               RecoveryMode.STANDALONE);
+                                       jid,
+                                       200000L,
+                                       200000L,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       42,
+                                       new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
+                                       new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
+                                       new ExecutionVertex[0],
+                                       cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(1, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        // create ourselves a checkpoint with state
                        final long timestamp = 34623786L;
@@ -206,15 +216,19 @@ public class CheckpointStateRestoreTest {
        public void testNoCheckpointAvailable() {
                try {
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                               new JobID(),
-                               200000L,
-                               200000L,
-                               42,
-                               new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
-                               new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
-                               new ExecutionVertex[0], cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1, cl), 
RecoveryMode.STANDALONE);
+                                       new JobID(),
+                                       200000L,
+                                       200000L,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       42,
+                                       new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
+                                       new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
+                                       new ExecutionVertex[0], cl,
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(1, cl),
+                                       new HeapSavepointStore(),
+                                       new DisabledCheckpointStatsTracker());
 
                        try {
                                coord.restoreLatestCheckpointedState(new 
HashMap<JobVertexID, ExecutionJobVertex>(), true, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 84d809a..634e177 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -238,7 +238,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                        long timestamp,
                        Map<JobVertexID, TaskState> taskGroupStates) {
 
-                       super(jobId, checkpointId, timestamp, Long.MAX_VALUE, 
taskGroupStates);
+                       super(jobId, checkpointId, timestamp, Long.MAX_VALUE, 
taskGroupStates, true);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
new file mode 100644
index 0000000..90a6836
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class CompletedCheckpointTest {
+
+       /**
+        * Tests that the `deleteStateWhenDisposed` flag is correctly forwarded.
+        */
+       @Test
+       public void testDiscard() throws Exception {
+               TaskState state = mock(TaskState.class);
+               Map<JobVertexID, TaskState> taskStates = new HashMap<>();
+               taskStates.put(new JobVertexID(), state);
+
+               // Verify discard call is forwarded to state
+               CompletedCheckpoint checkpoint = new CompletedCheckpoint(new 
JobID(), 0, 0, 1, taskStates, true);
+               checkpoint.discard(ClassLoader.getSystemClassLoader());
+               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+
+               Mockito.reset(state);
+
+               // Verify discard call is not forwarded to state
+               checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, 
taskStates, false);
+               checkpoint.discard(ClassLoader.getSystemClassLoader());
+               verify(state, 
times(0)).discard(Matchers.any(ClassLoader.class));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 0f2c2b2..7b05fd7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -27,26 +27,22 @@ import 
org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator;
 import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.AfterClass;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.lang.reflect.Field;
 import java.net.URL;
 import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -59,28 +55,6 @@ public class ExecutionGraphCheckpointCoordinatorTest {
        public static void teardown() {
                JavaTestKit.shutdownActorSystem(system);
        }
-       
-       @Test
-       public void 
testCheckpointAndSavepointCoordinatorShareCheckpointIDCounter() throws 
Exception {
-               ExecutionGraph executionGraph = 
createExecutionGraphAndEnableCheckpointing(
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1, 
ClassLoader.getSystemClassLoader()));
-
-               CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
-               SavepointCoordinator savepointCoordinator = 
executionGraph.getSavepointCoordinator();
-
-               // Both the checkpoint and savepoint coordinator need to operate
-               // with the same checkpoint ID counter.
-               Field counterField = 
CheckpointCoordinator.class.getDeclaredField("checkpointIdCounter");
-
-               CheckpointIDCounter counterCheckpointCoordinator = 
(CheckpointIDCounter) counterField
-                               .get(checkpointCoordinator);
-
-               CheckpointIDCounter counterSavepointCoordinator = 
(CheckpointIDCounter) counterField
-                               .get(savepointCoordinator);
-
-               assertEquals(counterCheckpointCoordinator, 
counterSavepointCoordinator);
-       }
 
        /**
         * Tests that a shut down checkpoint coordinator calls shutdown on
@@ -94,8 +68,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                ExecutionGraph graph = 
createExecutionGraphAndEnableCheckpointing(counter, store);
                graph.fail(new Exception("Test Exception"));
 
-               // Two times, because shared with savepoint coordinator
-               verify(counter, times(2)).shutdown();
+               verify(counter, times(1)).shutdown();
                verify(store, times(1)).shutdown();
        }
 
@@ -115,8 +88,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                verify(counter, times(0)).shutdown();
                verify(store, times(0)).shutdown();
 
-               // Two times, because shared with savepoint coordinator
-               verify(counter, times(2)).suspend();
+               verify(counter, times(1)).suspend();
                verify(store, times(1)).suspend();
        }
 
@@ -149,7 +121,6 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                                UUID.randomUUID(),
                                counter,
                                store,
-                               RecoveryMode.STANDALONE,
                                new HeapSavepointStore(),
                                new DisabledCheckpointStatsTracker());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
new file mode 100644
index 0000000..d235e61
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class PendingCheckpointTest {
+
+       private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS 
= new HashMap<>();
+       private static final ExecutionAttemptID ATTEMPT_ID = new 
ExecutionAttemptID();
+
+       static {
+               ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class));
+       }
+
+       /**
+        * Tests that pending checkpoints can be subsumed.
+        */
+       @Test
+       public void testCanBeSubsumed() throws Exception {
+               PendingCheckpoint pending = createPendingCheckpoint();
+               assertTrue(pending.canBeSubsumed());
+       }
+
+       /**
+        * Tests that abort discards state.
+        */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testAbort() throws Exception {
+               TaskState state = mock(TaskState.class);
+
+               // Abort declined
+               PendingCheckpoint pending = createPendingCheckpoint();
+               setTaskState(pending, state);
+
+               pending.abortDeclined();
+               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+
+               // Abort error
+               Mockito.reset(state);
+
+               pending = createPendingCheckpoint();
+               setTaskState(pending, state);
+
+               pending.abortError(new Exception("Expected Test Exception"));
+               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+
+               // Abort expired
+               Mockito.reset(state);
+
+               pending = createPendingCheckpoint();
+               setTaskState(pending, state);
+
+               pending.abortExpired();
+               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+
+               // Abort subsumed
+               Mockito.reset(state);
+
+               pending = createPendingCheckpoint();
+               setTaskState(pending, state);
+
+               pending.abortSubsumed();
+               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+       }
+
+       /**
+        * Tests that the CompletedCheckpoint `deleteStateWhenDisposed` flag is
+        * correctly set to true.
+        */
+       @Test
+       public void testFinalizeCheckpoint() throws Exception {
+               TaskState state = mock(TaskState.class);
+               PendingCheckpoint pending = createPendingCheckpoint();
+               PendingCheckpointTest.setTaskState(pending, state);
+
+               pending.acknowledgeTask(ATTEMPT_ID, null, 0, null);
+
+               CompletedCheckpoint checkpoint = pending.finalizeCheckpoint();
+
+               // Does discard state
+               checkpoint.discard(ClassLoader.getSystemClassLoader());
+               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static PendingCheckpoint createPendingCheckpoint() {
+               ClassLoader classLoader = ClassLoader.getSystemClassLoader();
+               Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new 
HashMap<>(ACK_TASKS);
+               return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, 
classLoader);
+       }
+
+       @SuppressWarnings("unchecked")
+       static void setTaskState(PendingCheckpoint pending, TaskState state) 
throws NoSuchFieldException, IllegalAccessException {
+               Field field = 
PendingCheckpoint.class.getDeclaredField("taskStates");
+               field.setAccessible(true);
+               Map<JobVertexID, TaskState> taskStates = (Map<JobVertexID, 
TaskState>) field.get(pending);
+
+               taskStates.put(new JobVertexID(), state);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
new file mode 100644
index 0000000..6ae6e1c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class PendingSavepointTest {
+
+       private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS 
= new HashMap<>();
+       private static final ExecutionAttemptID ATTEMPT_ID = new 
ExecutionAttemptID();
+
+       static {
+               ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class));
+       }
+
+       /**
+        * Tests that pending savepoints cannot be subsumed.
+        */
+       @Test
+       public void testCanBeSubsumed() throws Exception {
+               PendingSavepoint pending = createPendingSavepoint();
+               assertFalse(pending.canBeSubsumed());
+       }
+
+       /**
+        * Tests that abort discards state fails the completeion future.
+        */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testAbort() throws Exception {
+               TaskState state = mock(TaskState.class);
+
+               // Abort declined
+               PendingSavepoint pending = createPendingSavepoint();
+               PendingCheckpointTest.setTaskState(pending, state);
+
+               pending.abortDeclined();
+               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+
+               // Abort error
+               Mockito.reset(state);
+
+               pending = createPendingSavepoint();
+               PendingCheckpointTest.setTaskState(pending, state);
+               Future<String> future = pending.getCompletionFuture();
+
+               pending.abortError(new Exception("Expected Test Exception"));
+               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+               assertTrue(future.failed().isCompleted());
+
+               // Abort expired
+               Mockito.reset(state);
+
+               pending = createPendingSavepoint();
+               PendingCheckpointTest.setTaskState(pending, state);
+               future = pending.getCompletionFuture();
+
+               pending.abortExpired();
+               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+               assertTrue(future.failed().isCompleted());
+
+               // Abort subsumed
+               pending = createPendingSavepoint();
+
+               try {
+                       pending.abortSubsumed();
+                       fail("Did not throw expected Exception");
+               } catch (Throwable ignored) { // expected
+               }
+       }
+
+       /**
+        * Tests that the CompletedCheckpoint `deleteStateWhenDisposed` flag is
+        * correctly set to false.
+        */
+       @Test
+       public void testFinalizeCheckpoint() throws Exception {
+               TaskState state = mock(TaskState.class);
+               PendingSavepoint pending = createPendingSavepoint();
+               PendingCheckpointTest.setTaskState(pending, state);
+
+               Future<String> future = pending.getCompletionFuture();
+
+               pending.acknowledgeTask(ATTEMPT_ID, null, 0, null);
+
+               CompletedCheckpoint checkpoint = pending.finalizeCheckpoint();
+
+               // Does _NOT_ discard state
+               checkpoint.discard(ClassLoader.getSystemClassLoader());
+               verify(state, 
times(0)).discard(Matchers.any(ClassLoader.class));
+
+               // Future is completed
+               String path = Await.result(future, Duration.Zero());
+               assertNotNull(path);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static PendingSavepoint createPendingSavepoint() {
+               ClassLoader classLoader = ClassLoader.getSystemClassLoader();
+               Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new 
HashMap<>(ACK_TASKS);
+               return new PendingSavepoint(new JobID(), 0, 1, ackTasks, 
classLoader, new HeapSavepointStore());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStoreTest.java
deleted file mode 100644
index ec3dd0a..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStoreTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-public class HeapSavepointStoreTest {
-
-
-
-}

Reply via email to