Repository: flink
Updated Branches:
  refs/heads/master f79168052 -> 47acdeadf


[FLINK-4322] [checkpointing] Unify CheckpointCoordinator and 
SavepointCoordinator

The CheckpointCoordinator now also takes over the role of the 
SavepointCoordinator.
Savepoints are just like other checkpoints - they only store the metadata in 
addition.
Restoring from a savepoint means loading it into the CheckpointStore at startup.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76ca1a79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76ca1a79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76ca1a79

Branch: refs/heads/master
Commit: 76ca1a7955fedd8583a0af12289a14d0f1bcf868
Parents: f791680
Author: Stephan Ewen <[email protected]>
Authored: Mon Aug 8 19:18:44 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Aug 17 19:06:02 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 257 ++++++------
 .../checkpoint/CheckpointDeclineReason.java     |  50 +++
 .../checkpoint/CheckpointProperties.java        |  61 +++
 .../checkpoint/CheckpointTriggerResult.java     |  91 +++++
 .../runtime/checkpoint/CompletedCheckpoint.java |  64 ++-
 .../runtime/checkpoint/PendingCheckpoint.java   | 130 +++++--
 .../runtime/checkpoint/PendingSavepoint.java    | 155 ++++++++
 .../savepoint/SavepointCoordinator.java         | 386 -------------------
 .../SavepointCoordinatorDeActivator.java        |  64 ---
 .../checkpoint/savepoint/SavepointLoader.java   |  98 +++++
 .../runtime/executiongraph/ExecutionGraph.java  |  92 +----
 .../flink/runtime/jobmanager/JobManager.scala   |  68 ++--
 .../checkpoint/CheckpointPropertiesTest.java    |  37 ++
 13 files changed, 768 insertions(+), 785 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6aaa014..6d44e61 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,9 +21,10 @@ package org.apache.flink.runtime.checkpoint;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -39,9 +40,14 @@ import 
org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -75,8 +81,10 @@ public class CheckpointCoordinator {
        /** The number of recent checkpoints whose IDs are remembered */
        private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
 
+       // 
------------------------------------------------------------------------
+
        /** Coordinator-wide lock to safeguard the checkpoint updates */
-       protected final Object lock = new Object();
+       private final Object lock = new Object();
 
        /** The job whose checkpoint this coordinator coordinates */
        private final JobID job;
@@ -97,12 +105,15 @@ public class CheckpointCoordinator {
         * accessing this don't block the job manager actor and run 
asynchronously. */
        private final CompletedCheckpointStore completedCheckpointStore;
 
+       /** Store for savepoints. */
+       private final SavepointStore savepointStore;
+       
        /** A list of recent checkpoint IDs, to identify late messages (vs 
invalid ones) */
        private final ArrayDeque<Long> recentPendingCheckpoints;
 
        /** Checkpoint ID counter to ensure ascending IDs. In case of job 
manager failures, these
         * need to be ascending across job managers. */
-       protected final CheckpointIDCounter checkpointIdCounter;
+       private final CheckpointIDCounter checkpointIdCounter;
 
        /** Class loader used to deserialize the state handles (as they may be 
user-defined) */
        private final ClassLoader userClassLoader;
@@ -151,7 +162,7 @@ public class CheckpointCoordinator {
        /** Helper for tracking checkpoint statistics  */
        private final CheckpointStatsTracker statsTracker;
 
-       protected final int numberKeyGroups;
+       private final int numberKeyGroups;
 
        // 
--------------------------------------------------------------------------------------------
 
@@ -159,25 +170,6 @@ public class CheckpointCoordinator {
                        JobID job,
                        long baseInterval,
                        long checkpointTimeout,
-                       int numberKeyGroups,
-                       ExecutionVertex[] tasksToTrigger,
-                       ExecutionVertex[] tasksToWaitFor,
-                       ExecutionVertex[] tasksToCommitTo,
-                       ClassLoader userClassLoader,
-                       CheckpointIDCounter checkpointIDCounter,
-                       CompletedCheckpointStore completedCheckpointStore,
-                       RecoveryMode recoveryMode) throws Exception {
-
-               this(job, baseInterval, checkpointTimeout, 0L, 
Integer.MAX_VALUE, numberKeyGroups,
-                               tasksToTrigger, tasksToWaitFor, tasksToCommitTo,
-                               userClassLoader, checkpointIDCounter, 
completedCheckpointStore, recoveryMode,
-                               new DisabledCheckpointStatsTracker());
-       }
-
-       public CheckpointCoordinator(
-                       JobID job,
-                       long baseInterval,
-                       long checkpointTimeout,
                        long minPauseBetweenCheckpoints,
                        int maxConcurrentCheckpointAttempts,
                        int numberKeyGroups,
@@ -187,14 +179,16 @@ public class CheckpointCoordinator {
                        ClassLoader userClassLoader,
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore completedCheckpointStore,
+                       SavepointStore savepointStore,
                        RecoveryMode recoveryMode,
                        CheckpointStatsTracker statsTracker) throws Exception {
 
-               // Sanity check
+               // sanity checks
                checkArgument(baseInterval > 0, "Checkpoint timeout must be 
larger than zero");
                checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must 
be larger than zero");
                checkArgument(minPauseBetweenCheckpoints >= 0, 
"minPauseBetweenCheckpoints must be >= 0");
                checkArgument(maxConcurrentCheckpointAttempts >= 1, 
"maxConcurrentCheckpointAttempts must be >= 1");
+               checkArgument(numberKeyGroups >= 1, "numberKeyGroups must be >= 
1");
 
                this.job = checkNotNull(job);
                this.baseInterval = baseInterval;
@@ -205,17 +199,16 @@ public class CheckpointCoordinator {
                this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
                this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
                this.pendingCheckpoints = new LinkedHashMap<Long, 
PendingCheckpoint>();
+               this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
                this.completedCheckpointStore = 
checkNotNull(completedCheckpointStore);
+               this.savepointStore = checkNotNull(savepointStore);
                this.recentPendingCheckpoints = new 
ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
-               this.userClassLoader = userClassLoader;
-
-               // Started with the periodic scheduler
-               this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
+               this.userClassLoader = checkNotNull(userClassLoader);
+               this.statsTracker = checkNotNull(statsTracker);
+               this.numberKeyGroups = numberKeyGroups;
 
                this.timer = new Timer("Checkpoint Timer", true);
 
-               this.statsTracker = checkNotNull(statsTracker);
-
                if (recoveryMode == RecoveryMode.STANDALONE) {
                        // Add shutdown hook to clean up state handles when no 
checkpoint recovery is
                        // possible. In case of another configured recovery 
mode, the checkpoints need to be
@@ -248,29 +241,12 @@ public class CheckpointCoordinator {
                        this.shutdownHook = null;
                }
 
-               this.numberKeyGroups = numberKeyGroups;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Callbacks
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Callback on shutdown of the coordinator. Called in lock scope.
-        */
-       protected void onShutdown() {
-       }
-
-       /**
-        * Callback on cancellation of a checkpoint. Called in lock scope.
-        */
-       protected void onCancelCheckpoint(long canceledCheckpointId) {
-       }
-
-       /**
-        * Callback on full acknowledgement of a checkpoint. Called in lock 
scope.
-        */
-       protected void onFullyAcknowledgedCheckpoint(CompletedCheckpoint 
checkpoint) {
+               // make sure the checkpoint ID enumerator is running
+               try {
+                       checkpointIdCounter.start();
+               } catch (Exception e) {
+                       throw new Exception("Failed to start checkpoint ID 
counter: " + e.getMessage(), e);
+               }
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -328,7 +304,7 @@ public class CheckpointCoordinator {
 
                                        // clear and discard all pending 
checkpoints
                                        for (PendingCheckpoint pending : 
pendingCheckpoints.values()) {
-                                               
pending.discard(userClassLoader);
+                                               pending.abortError(new 
Exception("Checkpoint Coordinator is shutting down"));
                                        }
                                        pendingCheckpoints.clear();
 
@@ -339,8 +315,6 @@ public class CheckpointCoordinator {
                                                
completedCheckpointStore.suspend();
                                                checkpointIdCounter.suspend();
                                        }
-
-                                       onShutdown();
                                }
                        } finally {
                                // Remove shutdown hook to prevent resource 
leaks, unless this is invoked by the
@@ -368,6 +342,21 @@ public class CheckpointCoordinator {
        //  Handling checkpoints and messages
        // 
--------------------------------------------------------------------------------------------
 
+       public Future<String> triggerSavepoint(long timestamp) throws Exception 
{
+               CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
CheckpointProperties.forStandardSavepoint());
+
+               if (result.isSuccess()) {
+                       PendingSavepoint savepoint = (PendingSavepoint) 
result.getPendingCheckpoint();
+                       return savepoint.getCompletionFuture();
+               }
+               else {
+                       final Promise<String> promise = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
+                       promise.failure(
+                                       new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message()));
+                       return promise.future(); 
+               }
+       }
+
        /**
         * Triggers a new checkpoint and uses the given timestamp as the 
checkpoint
         * timestamp.
@@ -375,42 +364,39 @@ public class CheckpointCoordinator {
         * @param timestamp The timestamp for the checkpoint.
         */
        public boolean triggerCheckpoint(long timestamp) throws Exception {
-               return triggerCheckpoint(timestamp, -1);
+               return triggerCheckpoint(timestamp, 
CheckpointProperties.forStandardCheckpoint()).isSuccess();
        }
 
-       /**
-        * Triggers a new checkpoint and uses the given timestamp as the 
checkpoint
-        * timestamp.
-        *
-        * @param timestamp The timestamp for the checkpoint.
-        * @param nextCheckpointId The checkpoint ID to use for this checkpoint 
or <code>-1</code> if
-        *                         the checkpoint ID counter should be queried.
-        */
-       public boolean triggerCheckpoint(long timestamp, long nextCheckpointId) 
throws Exception {
+       CheckpointTriggerResult triggerCheckpoint(long timestamp, 
CheckpointProperties props) throws Exception {
                // make some eager pre-checks
                synchronized (lock) {
                        // abort if the coordinator has been shutdown in the 
meantime
                        if (shutdown) {
-                               return false;
+                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
                        }
 
-                       // sanity check: there should never be more than one 
trigger request queued
-                       if (triggerRequestQueued) {
-                               LOG.warn("Trying to trigger another checkpoint 
while one was queued already");
-                               return false;
-                       }
+                       // validate whether the checkpoint can be triggered, 
with respect to the limit of
+                       // concurrent checkpoints, and the minimum time between 
checkpoints.
+                       // these checks are not relevant for savepoints
+                       if (!props.isSavepoint()) {
+                               // sanity check: there should never be more 
than one trigger request queued
+                               if (triggerRequestQueued) {
+                                       LOG.warn("Trying to trigger another 
checkpoint while one was queued already");
+                                       return new 
CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
+                               }
 
-                       // if too many checkpoints are currently in progress, 
we need to mark that a request is queued
-                       if (pendingCheckpoints.size() >= 
maxConcurrentCheckpointAttempts) {
-                               triggerRequestQueued = true;
-                               if (currentPeriodicTrigger != null) {
-                                       currentPeriodicTrigger.cancel();
-                                       currentPeriodicTrigger = null;
+                               // if too many checkpoints are currently in 
progress, we need to mark that a request is queued
+                               if (pendingCheckpoints.size() >= 
maxConcurrentCheckpointAttempts) {
+                                       triggerRequestQueued = true;
+                                       if (currentPeriodicTrigger != null) {
+                                               currentPeriodicTrigger.cancel();
+                                               currentPeriodicTrigger = null;
+                                       }
+                                       return new 
CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
                                }
-                               return false;
                        }
 
-                       //make sure the minimum interval between checkpoints 
has passed
+                       // make sure the minimum interval between checkpoints 
has passed
                        if (lastTriggeredCheckpoint + 
minPauseBetweenCheckpoints > timestamp) {
                                if (currentPeriodicTrigger != null) {
                                        currentPeriodicTrigger.cancel();
@@ -418,11 +404,11 @@ public class CheckpointCoordinator {
                                }
                                ScheduledTrigger trigger = new 
ScheduledTrigger();
                                timer.scheduleAtFixedRate(trigger, 
minPauseBetweenCheckpoints, baseInterval);
-                               return false;
+                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                        }
                }
 
-               // first check if all tasks that we need to trigger are running.
+               // check if all tasks that we need to trigger are running.
                // if not, abort the checkpoint
                ExecutionAttemptID[] triggerIDs = new 
ExecutionAttemptID[tasksToTrigger.length];
                for (int i = 0; i < tasksToTrigger.length; i++) {
@@ -432,7 +418,7 @@ public class CheckpointCoordinator {
                        } else {
                                LOG.info("Checkpoint triggering task {} is not 
being executed at the moment. Aborting checkpoint.",
                                                
tasksToTrigger[i].getSimpleName());
-                               return false;
+                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
                        }
                }
 
@@ -447,7 +433,7 @@ public class CheckpointCoordinator {
                        } else {
                                LOG.info("Checkpoint acknowledging task {} is 
not being executed at the moment. Aborting checkpoint.",
                                                ev.getSimpleName());
-                               return false;
+                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
                        }
                }
 
@@ -455,25 +441,22 @@ public class CheckpointCoordinator {
 
                lastTriggeredCheckpoint = timestamp;
                final long checkpointID;
-               if (nextCheckpointId < 0) {
-                       try {
-                               // this must happen outside the locked scope, 
because it communicates
-                               // with external services (in HA mode) and may 
block for a while.
-                               checkpointID = 
checkpointIdCounter.getAndIncrement();
-                       }
-                       catch (Throwable t) {
-                               int numUnsuccessful = 
++numUnsuccessfulCheckpointsTriggers;
-                               LOG.warn("Failed to trigger checkpoint (" + 
numUnsuccessful + " consecutive failed attempts so far)", t);
-                               return false;
-                       }
+               try {
+                       // this must happen outside the locked scope, because 
it communicates
+                       // with external services (in HA mode) and may block 
for a while.
+                       checkpointID = checkpointIdCounter.getAndIncrement();
                }
-               else {
-                       checkpointID = nextCheckpointId;
+               catch (Throwable t) {
+                       int numUnsuccessful = 
++numUnsuccessfulCheckpointsTriggers;
+                       LOG.warn("Failed to trigger checkpoint (" + 
numUnsuccessful + " consecutive failed attempts so far)", t);
+                       return new 
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
                }
 
                LOG.info("Triggering checkpoint " + checkpointID + " @ " + 
timestamp);
 
-               final PendingCheckpoint checkpoint = new PendingCheckpoint(job, 
checkpointID, timestamp, ackTasks);
+               final PendingCheckpoint checkpoint = props.isSavepoint() ?
+                       new PendingSavepoint(job, checkpointID, timestamp, 
ackTasks, userClassLoader, savepointStore) :
+                       new PendingCheckpoint(job, checkpointID, timestamp, 
ackTasks, userClassLoader);
 
                // schedule the timer that will clean up the expired checkpoints
                TimerTask canceller = new TimerTask() {
@@ -486,12 +469,10 @@ public class CheckpointCoordinator {
                                                if (!checkpoint.isDiscarded()) {
                                                        LOG.info("Checkpoint " 
+ checkpointID + " expired before completing.");
 
-                                                       
checkpoint.discard(userClassLoader);
+                                                       
checkpoint.abortExpired();
                                                        
pendingCheckpoints.remove(checkpointID);
                                                        
rememberRecentCheckpointId(checkpointID);
 
-                                                       
onCancelCheckpoint(checkpointID);
-
                                                        triggerQueuedRequests();
                                                }
                                        }
@@ -511,19 +492,21 @@ public class CheckpointCoordinator {
                                // blocking progress, and still gives us early 
checks that skip work
                                // if no checkpoint can happen anyways
                                if (shutdown) {
-                                       return false;
-                               }
-                               else if (triggerRequestQueued) {
-                                       LOG.warn("Trying to trigger another 
checkpoint while one was queued already");
-                                       return false;
+                                       return new 
CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
                                }
-                               else if (pendingCheckpoints.size() >= 
maxConcurrentCheckpointAttempts) {
-                                       triggerRequestQueued = true;
-                                       if (currentPeriodicTrigger != null) {
-                                               currentPeriodicTrigger.cancel();
-                                               currentPeriodicTrigger = null;
+                               else if (!props.isSavepoint()) {
+                                       if (triggerRequestQueued) {
+                                               LOG.warn("Trying to trigger 
another checkpoint while one was queued already");
+                                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
+                                       }
+                                       else if (pendingCheckpoints.size() >= 
maxConcurrentCheckpointAttempts) {
+                                               triggerRequestQueued = true;
+                                               if (currentPeriodicTrigger != 
null) {
+                                                       
currentPeriodicTrigger.cancel();
+                                                       currentPeriodicTrigger 
= null;
+                                               }
+                                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
                                        }
-                                       return false;
                                }
 
                                pendingCheckpoints.put(checkpointID, 
checkpoint);
@@ -539,7 +522,7 @@ public class CheckpointCoordinator {
                        }
 
                        numUnsuccessfulCheckpointsTriggers = 0;
-                       return true;
+                       return new CheckpointTriggerResult(checkpoint);
                }
                catch (Throwable t) {
                        // guard the map against concurrent modifications
@@ -549,10 +532,11 @@ public class CheckpointCoordinator {
 
                        int numUnsuccessful = 
++numUnsuccessfulCheckpointsTriggers;
                        LOG.warn("Failed to trigger checkpoint (" + 
numUnsuccessful + " consecutive failed attempts so far)", t);
+
                        if (!checkpoint.isDiscarded()) {
-                               checkpoint.discard(userClassLoader);
+                               checkpoint.abortError(new Exception("Failed to 
trigger checkpoint"));
                        }
-                       return false;
+                       return new 
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
                }
        }
 
@@ -598,11 +582,9 @@ public class CheckpointCoordinator {
                                        + " because of checkpoint decline from 
task " + message.getTaskExecutionId());
 
                                pendingCheckpoints.remove(checkpointId);
-                               checkpoint.discard(userClassLoader);
+                               checkpoint.abortDeclined();
                                rememberRecentCheckpointId(checkpointId);
 
-                               onCancelCheckpoint(checkpointId);
-
                                boolean haveMoreRecentPending = false;
                                Iterator<Map.Entry<Long, PendingCheckpoint>> 
entries = pendingCheckpoints.entrySet().iterator();
                                while (entries.hasNext()) {
@@ -683,6 +665,7 @@ public class CheckpointCoordinator {
                                        message.getState(),
                                        message.getStateSize(),
                                        null)) { // TODO: Give KV-state to the 
acknowledgeTask method
+                                       
                                        if (checkpoint.isFullyAcknowledged()) {
                                                completed = 
checkpoint.finalizeCheckpoint();
 
@@ -705,8 +688,6 @@ public class CheckpointCoordinator {
 
                                                
dropSubsumedCheckpoints(completed.getTimestamp());
 
-                                               
onFullyAcknowledgedCheckpoint(completed);
-
                                                triggerQueuedRequests();
                                        }
                                }
@@ -762,15 +743,12 @@ public class CheckpointCoordinator {
 
        private void dropSubsumedCheckpoints(long timestamp) throws Exception {
                Iterator<Map.Entry<Long, PendingCheckpoint>> entries = 
pendingCheckpoints.entrySet().iterator();
+
                while (entries.hasNext()) {
                        PendingCheckpoint p = entries.next().getValue();
-                       if (p.getCheckpointTimestamp() < timestamp) {
+                       if (p.getCheckpointTimestamp() < timestamp && 
p.canBeSubsumed()) {
                                rememberRecentCheckpointId(p.getCheckpointId());
-
-                               p.discard(userClassLoader);
-
-                               onCancelCheckpoint(p.getCheckpointId());
-
+                               p.abortSubsumed();
                                entries.remove();
                        }
                }
@@ -927,17 +905,12 @@ public class CheckpointCoordinator {
                }
        }
 
-       protected long getAndIncrementCheckpointId() {
-               try {
-                       // this must happen outside the locked scope, because 
it communicates
-                       // with external services (in HA mode) and may block 
for a while.
-                       return checkpointIdCounter.getAndIncrement();
-               }
-               catch (Throwable t) {
-                       int numUnsuccessful = 
++numUnsuccessfulCheckpointsTriggers;
-                       LOG.warn("Failed to trigger checkpoint (" + 
numUnsuccessful + " consecutive failed attempts so far)", t);
-                       return -1;
-               }
+       public CompletedCheckpointStore getCheckpointStore() {
+               return completedCheckpointStore;
+       }
+
+       public CheckpointIDCounter getCheckpointIdCounter() {
+               return checkpointIdCounter;
        }
 
        protected ActorGateway getJobStatusListener() {
@@ -961,14 +934,6 @@ public class CheckpointCoordinator {
                        // make sure all prior timers are cancelled
                        stopCheckpointScheduler();
 
-                       try {
-                               // Multiple start calls are OK
-                               checkpointIdCounter.start();
-                       } catch (Exception e) {
-                               String msg = "Failed to start checkpoint ID 
counter: " + e.getMessage();
-                               throw new RuntimeException(msg, e);
-                       }
-
                        periodicScheduling = true;
                        currentPeriodicTrigger = new ScheduledTrigger();
                        timer.scheduleAtFixedRate(currentPeriodicTrigger, 
baseInterval, baseInterval);
@@ -986,7 +951,7 @@ public class CheckpointCoordinator {
                        }
 
                        for (PendingCheckpoint p : pendingCheckpoints.values()) 
{
-                               p.discard(userClassLoader);
+                               p.abortError(new Exception("Checkpoint 
Coordinator is suspending."));
                        }
                        pendingCheckpoints.clear();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
new file mode 100644
index 0000000..2cc9094
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * Various reasons why a checkpoint was declined.
+ */
+public enum CheckpointDeclineReason {
+
+       COORDINATOR_SHUTDOWN("Checkpoint coordinator is shut down."),
+
+       ALREADY_QUEUED("Another checkpoint request has already been queued."),
+
+       TOO_MANY_CONCURRENT_CHECKPOINTS("The maximum number of concurrent 
checkpoints is exceeded"),
+
+       MINIMUM_TIME_BETWEEN_CHECKPOINTS("The minimum time between checkpoints 
is still pending. " +
+                       "Checkpoint will be triggered after the minimum time."),
+
+       NOT_ALL_REQUIRED_TASKS_RUNNING("Not all required tasks are currently 
running."),
+
+       EXCEPTION("An Exception occurred while triggering the checkpoint.");
+
+       // 
------------------------------------------------------------------------
+
+       private final String message;
+
+       CheckpointDeclineReason(String message) {
+               this.message = message;
+       }
+
+       public String message() {
+               return message;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
new file mode 100644
index 0000000..7ea645a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * The configuration of a checkpoint, such as whether
+ * <ul>
+ *     <li>The checkpoint is a savepoint</li>
+ *     <li>The checkpoint must be full, or may be incremental</li>
+ *     <li>The checkpoint format must be the common (cross backend) format, or 
may be state-backend specific</li>
+ * </ul>
+ */
+public class CheckpointProperties {
+
+       private final boolean isSavepoint;
+
+       private CheckpointProperties(boolean isSavepoint) {
+               this.isSavepoint = isSavepoint;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public boolean isSavepoint() {
+               return isSavepoint;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return "CheckpointProperties {" +
+                               "isSavepoint=" + isSavepoint +
+                               '}';
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public static CheckpointProperties forStandardSavepoint() {
+               return new CheckpointProperties(true);
+       }
+
+       public static CheckpointProperties forStandardCheckpoint() {
+               return new CheckpointProperties(false);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
new file mode 100644
index 0000000..3f91407
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The result of triggering a checkpoint. May be a declined checkpoint trigger 
attempt,
+ * or a pending checkpoint.
+ */
+class CheckpointTriggerResult {
+
+       /** If success, the pending checkpoint created after the successfully 
trigger, otherwise null */
+       private final PendingCheckpoint success;
+
+       /** If failure, the reason why the triggering was declined, otherwise 
null. */
+       private final CheckpointDeclineReason failure;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a successful checkpoint trigger result.
+        * 
+        * @param success The pending checkpoint created after the successfully 
trigger.
+        */
+       CheckpointTriggerResult(PendingCheckpoint success) {
+               this.success = checkNotNull(success);
+               this.failure = null;
+       }
+
+       /**
+        * Creates a failed checkpoint trigger result. 
+        * 
+        * @param failure The reason why the checkpoint could not be triggered.
+        */
+       CheckpointTriggerResult(CheckpointDeclineReason failure) {
+               this.success = null;
+               this.failure = checkNotNull(failure);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public boolean isSuccess() {
+               return success != null;
+       }
+
+       public boolean isFailure() {
+               return failure != null;
+       }
+
+       public PendingCheckpoint getPendingCheckpoint() {
+               if (success != null) {
+                       return success;
+               } else {
+                       throw new IllegalStateException();
+               }
+       }
+
+       public CheckpointDeclineReason getFailureReason() {
+               if (failure != null) {
+                       return failure;
+               } else {
+                       throw new IllegalStateException();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return isSuccess() ? 
+                               ("success: " + success) :
+                               ("failure: " + failure.message()); 
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index ecc67b4..d8c6c7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -20,11 +20,12 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.util.Map;
-import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A successful checkpoint describes a checkpoint after all required tasks 
acknowledged it (with their state)
@@ -47,20 +48,34 @@ public class CompletedCheckpoint implements Serializable {
        /** States of the different task groups belonging to this checkpoint */
        private final Map<JobVertexID, TaskState> taskStates;
 
-       public CompletedCheckpoint(
-               JobID job,
-               long checkpointID,
-               long timestamp,
-               long completionTimestamp,
-               Map<JobVertexID, TaskState> taskStates) {
+       /** Flag to indicate whether the completed checkpoint data should be 
deleted when this
+        * handle to the checkpoint is disposed */
+       private final boolean deleteStateWhenDisposed;
 
-               this.job = Preconditions.checkNotNull(job);
+       // 
------------------------------------------------------------------------
+
+       public CompletedCheckpoint(
+                       JobID job,
+                       long checkpointID,
+                       long timestamp,
+                       long completionTimestamp,
+                       Map<JobVertexID, TaskState> taskStates,
+                       boolean deleteStateWhenDisposed) {
+
+               checkArgument(checkpointID >= 0);
+               checkArgument(timestamp >= 0);
+               checkArgument(completionTimestamp >= 0);
+
+               this.job = checkNotNull(job);
                this.checkpointID = checkpointID;
                this.timestamp = timestamp;
                this.duration = completionTimestamp - timestamp;
-               this.taskStates = Preconditions.checkNotNull(taskStates);
+               this.taskStates = checkNotNull(taskStates);
+               this.deleteStateWhenDisposed = deleteStateWhenDisposed;
        }
 
+       // 
------------------------------------------------------------------------
+
        public JobID getJobId() {
                return job;
        }
@@ -96,10 +111,12 @@ public class CompletedCheckpoint implements Serializable {
        }
 
        // 
--------------------------------------------------------------------------------------------
-       
+
        public void discard(ClassLoader userClassLoader) throws Exception {
-               for (TaskState state: taskStates.values()) {
-                       state.discard(userClassLoader);
+               if (deleteStateWhenDisposed) {
+                       for (TaskState state: taskStates.values()) {
+                               state.discard(userClassLoader);
+                       }
                }
 
                taskStates.clear();
@@ -108,27 +125,6 @@ public class CompletedCheckpoint implements Serializable {
        // 
--------------------------------------------------------------------------------------------
 
        @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof CompletedCheckpoint) {
-                       CompletedCheckpoint other = (CompletedCheckpoint) obj;
-
-                       return job.equals(other.job) && checkpointID == 
other.checkpointID &&
-                               timestamp == other.timestamp && duration == 
other.duration &&
-                               taskStates.equals(other.taskStates);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public int hashCode() {
-               return (int) (this.checkpointID ^ this.checkpointID >>> 32) +
-                       31 * ((int) (this.timestamp ^ this.timestamp >>> 32) +
-                               31 * ((int) (this.duration ^ this.duration >>> 
32) +
-                                       31 * Objects.hash(job, taskStates)));
-       }
-       
-       @Override
        public String toString() {
                return String.format("Checkpoint %d @ %d for %s", checkpointID, 
timestamp, job);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index cd1f6c4..ab3fe0a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -28,54 +28,73 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A pending checkpoint is a checkpoint that has been started, but has not been
  * acknowledged by all tasks that need to acknowledge it. Once all tasks have
  * acknowledged it, it becomes a {@link CompletedCheckpoint}.
  * 
  * <p>Note that the pending checkpoint, as well as the successful checkpoint 
keep the
- * state handles always as serialized values, never as actual values.</p>
+ * state handles always as serialized values, never as actual values.
  */
 public class PendingCheckpoint {
-               
+
        private final Object lock = new Object();
-       
+
        private final JobID jobId;
-       
+
        private final long checkpointId;
-       
+
        private final long checkpointTimestamp;
 
        private final Map<JobVertexID, TaskState> taskStates;
 
        private final Map<ExecutionAttemptID, ExecutionVertex> 
notYetAcknowledgedTasks;
-       
+
+       private final ClassLoader userCodeClassLoader;
+
+       private final boolean disposeWhenSubsumed;
+
        private int numAcknowledgedTasks;
-       
+
        private boolean discarded;
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
-       public PendingCheckpoint(JobID jobId, long checkpointId, long 
checkpointTimestamp,
-                                                       Map<ExecutionAttemptID, 
ExecutionVertex> verticesToConfirm)
+
+       public PendingCheckpoint(
+                       JobID jobId,
+                       long checkpointId,
+                       long checkpointTimestamp,
+                       Map<ExecutionAttemptID, ExecutionVertex> 
verticesToConfirm,
+                       ClassLoader userCodeClassLoader) {
+               this(jobId, checkpointId, checkpointTimestamp, 
verticesToConfirm, userCodeClassLoader, true);
+       }
+
+       PendingCheckpoint(
+                       JobID jobId,
+                       long checkpointId,
+                       long checkpointTimestamp,
+                       Map<ExecutionAttemptID, ExecutionVertex> 
verticesToConfirm,
+                       ClassLoader userCodeClassLoader,
+                       boolean disposeWhenSubsumed)
        {
-               if (jobId == null || verticesToConfirm == null) {
-                       throw new NullPointerException();
-               }
-               if (verticesToConfirm.size() == 0) {
-                       throw new IllegalArgumentException("Checkpoint needs at 
least one vertex that commits the checkpoint");
-               }
-               
-               this.jobId = jobId;
+               this.jobId = checkNotNull(jobId);
                this.checkpointId = checkpointId;
                this.checkpointTimestamp = checkpointTimestamp;
-               
-               this.notYetAcknowledgedTasks = verticesToConfirm;
+               this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
+               this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+               this.disposeWhenSubsumed = disposeWhenSubsumed;
                this.taskStates = new HashMap<>();
+
+               checkArgument(verticesToConfirm.size() > 0,
+                               "Checkpoint needs at least one vertex that 
commits the checkpoint");
        }
-       
-       // 
--------------------------------------------------------------------------------------------
 
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
 
        public JobID getJobId() {
                return jobId;
@@ -92,7 +111,7 @@ public class PendingCheckpoint {
        public int getNumberOfNonAcknowledgedTasks() {
                return notYetAcknowledgedTasks.size();
        }
-       
+
        public int getNumberOfAcknowledgedTasks() {
                return numAcknowledgedTasks;
        }
@@ -104,11 +123,25 @@ public class PendingCheckpoint {
        public boolean isFullyAcknowledged() {
                return this.notYetAcknowledgedTasks.isEmpty() && !discarded;
        }
-       
+
        public boolean isDiscarded() {
                return discarded;
        }
-       
+
+       /**
+        * Checks whether this checkpoint can be subsumed or whether it should 
always continue, regardless
+        * of newer checkpoints in progress.
+        * 
+        * @return True if the checkpoint can be subsumed, false otherwise.
+        */
+       public boolean canBeSubsumed() {
+               return true;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Progress and Completion
+       // 
------------------------------------------------------------------------
+
        public CompletedCheckpoint finalizeCheckpoint() throws Exception {
                synchronized (lock) {
                        if (discarded) {
@@ -120,9 +153,11 @@ public class PendingCheckpoint {
                                        checkpointId,
                                        checkpointTimestamp,
                                        System.currentTimeMillis(),
-                                       new HashMap<>(taskStates));
-                               dispose(null, false);
-                               
+                                       new HashMap<>(taskStates),
+                                       disposeWhenSubsumed);
+
+                               dispose(false);
+
                                return completed;
                        }
                        else {
@@ -190,22 +225,45 @@ public class PendingCheckpoint {
                        }
                }
        }
-       
+
+       // 
------------------------------------------------------------------------
+       //  Cancellation
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Aborts a checkpoint because it expired (took too long).
+        */
+       public void abortExpired() throws Exception {
+               dispose(true);
+       }
+
+       /**
+        * Aborts the pending checkpoint because a newer completed checkpoint 
subsumed it.
+        */
+       public void abortSubsumed() throws Exception {
+               dispose(true);
+       }
+
+       public void abortDeclined() throws Exception {
+               dispose(true);
+       }
+
        /**
-        * Discards the pending checkpoint, releasing all held resources.
+        * Aborts the pending checkpoint due to an error.
+        * @param cause The error's exception.
         */
-       public void discard(ClassLoader userClassLoader) throws Exception {
-               dispose(userClassLoader, true);
+       public void abortError(Throwable cause) throws Exception {
+               dispose(true);
        }
 
-       private void dispose(ClassLoader userClassLoader, boolean releaseState) 
throws Exception {
+       protected void dispose(boolean releaseState) throws Exception {
                synchronized (lock) {
                        discarded = true;
                        numAcknowledgedTasks = -1;
                        try {
                                if (releaseState) {
                                        for (TaskState taskState : 
taskStates.values()) {
-                                               
taskState.discard(userClassLoader);
+                                               
taskState.discard(userCodeClassLoader);
                                        }
                                }
                        } finally {
@@ -219,7 +277,7 @@ public class PendingCheckpoint {
 
        @Override
        public String toString() {
-               return String.format("PendingCheckpoint %d @ %d - confirmed=%d, 
pending=%d",
+               return String.format("Pending Checkpoint %d @ %d - 
confirmed=%d, pending=%d",
                                checkpointId, checkpointTimestamp, 
getNumberOfAcknowledgedTasks(), getNumberOfNonAcknowledgedTasks());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
new file mode 100644
index 0000000..92cdd04
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV0;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A pending savepoint is like a pending checkpoint, but it additionally 
performs some
+ * actions upon completion, like notifying the triggerer.
+ */
+public class PendingSavepoint extends PendingCheckpoint {
+
+       private static final Logger LOG = CheckpointCoordinator.LOG;
+
+       private final SavepointStore store;
+
+       /** The promise to fulfill once the savepoint is complete */
+       private final Promise<String> onCompletionPromise;
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       public PendingSavepoint(
+                       JobID jobId,
+                       long checkpointId,
+                       long checkpointTimestamp,
+                       Map<ExecutionAttemptID, ExecutionVertex> 
verticesToConfirm,
+                       ClassLoader userCodeClassLoader,
+                       SavepointStore store)
+       {
+               super(jobId, checkpointId, checkpointTimestamp, 
verticesToConfirm, userCodeClassLoader, false);
+
+               this.store = checkNotNull(store);
+               this.onCompletionPromise = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Savepoint completion
+       // 
--------------------------------------------------------------------------------------------
+
+       public Future<String> getCompletionFuture() {
+               return onCompletionPromise.future();
+       }
+       
+       @Override
+       public CompletedCheckpoint finalizeCheckpoint() throws Exception {
+               // finalize checkpoint (this also disposes this pending 
checkpoint)
+               CompletedCheckpoint completedCheckpoint = 
super.finalizeCheckpoint();
+
+               // now store the checkpoint externally as a savepoint
+               try {
+                       Savepoint savepoint = new SavepointV0(
+                                       completedCheckpoint.getCheckpointID(),
+                                       
completedCheckpoint.getTaskStates().values());
+                       
+                       String path = store.storeSavepoint(savepoint);
+                       onCompletionPromise.success(path);
+               }
+               catch (Throwable t) {
+                       LOG.warn("Failed to store savepoint.", t);
+                       onCompletionPromise.failure(t);
+
+                       ExceptionUtils.rethrow(t, "Failed to store savepoint.");
+               }
+
+               return completedCheckpoint;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Cancellation / Disposal
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean canBeSubsumed() {
+               return false;
+       }
+
+       @Override
+       public void abortSubsumed() throws Exception {
+               throw new Exception("Bug: Savepoints must never be subsumed");
+       }
+
+       @Override
+       public void abortExpired() throws Exception {
+               try {
+                       LOG.info("Savepoint with checkpoint ID " + 
getCheckpointId() + " expired before completing.");
+                       onCompletionPromise.failure(new Exception("Savepoint 
expired before completing"));
+               }
+               finally {
+                       dispose(true);
+               }
+       }
+
+       @Override
+       public void abortDeclined() throws Exception {
+               try {
+                       LOG.info("Savepoint with checkpoint ID " + 
getCheckpointId() + " was declined (tasks not ready).");
+                       onCompletionPromise.failure(new Exception("Savepoint 
was declined (tasks not ready)"));
+               }
+               finally {
+                       dispose(true);
+               }
+       }
+
+       @Override
+       public void abortError(Throwable cause) throws Exception {
+               try {
+                       LOG.info("Savepoint with checkpoint ID " + 
getCheckpointId() + " failed due to an error", cause);
+                       onCompletionPromise.failure(
+                                       new Exception("Savepoint could not be 
completed: " + cause.getMessage(), cause));
+               }
+               finally {
+                       dispose(true);
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return String.format("Pending Savepoint %d @ %d - confirmed=%d, 
pending=%d",
+                               getCheckpointId(), getCheckpointTimestamp(),
+                               getNumberOfAcknowledgedTasks(), 
getNumberOfNonAcknowledgedTasks());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
deleted file mode 100644
index cd9eb0c..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The savepoint coordinator is a slightly modified variant of the regular
- * checkpoint coordinator. Checkpoints are not triggered periodically, but
- * manually. The actual checkpointing mechanism is the same as for periodic
- * checkpoints, only the control flow is modified.
- *
- * <p>The savepoint coordinator is meant to be used as a separate coordinator
- * instance. Otherwise, there can be unwanted queueing effects like discarding
- * savepoints, because of in-progress periodic checkpoints.
- *
- * <p>The savepoint coordinator registers callbacks on the regular checkpoint
- * life-cycle and manages a map of promises, which are completed/failed as soon
- * as the trigged checkpoint is done.
- *
- * <p><strong>Important</strong>: it's necessary that both the periodic
- * checkpoint coordinator and the savepoint coordinator <em>share</em> the same
- * instance of the {@link CheckpointIDCounter} to ensure that all task managers
- * see ascending checkpoints IDs.
- */
-public class SavepointCoordinator extends CheckpointCoordinator {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(SavepointCoordinator.class);
-
-       /** Store for savepoints. */
-       private final SavepointStore savepointStore;
-
-       /** Mapping from checkpoint ID to promises for savepoints. */
-       private final Map<Long, Promise<String>> savepointPromises;
-
-       // TODO(uce) Temporary work around to restore initial state on
-       // failure during recovery. Will be superseded by FLINK-3397.
-       private volatile String savepointRestorePath;
-
-       public SavepointCoordinator(
-                       JobID jobId,
-                       long baseInterval,
-                       long checkpointTimeout,
-                       int numberKeyGroups,
-                       ExecutionVertex[] tasksToTrigger,
-                       ExecutionVertex[] tasksToWaitFor,
-                       ExecutionVertex[] tasksToCommitTo,
-                       ClassLoader userClassLoader,
-                       CheckpointIDCounter checkpointIDCounter,
-                       SavepointStore savepointStore,
-                       CheckpointStatsTracker statsTracker) throws Exception {
-
-               super(jobId,
-                               baseInterval,
-                               checkpointTimeout,
-                               0L,
-                               Integer.MAX_VALUE,
-                               numberKeyGroups,
-                               tasksToTrigger,
-                               tasksToWaitFor,
-                               tasksToCommitTo,
-                               userClassLoader,
-                               checkpointIDCounter,
-                               IgnoreCheckpointsStore.INSTANCE,
-                               RecoveryMode.STANDALONE,
-                               statsTracker);
-
-               this.savepointStore = checkNotNull(savepointStore);
-               this.savepointPromises = new ConcurrentHashMap<>();
-       }
-
-       public String getSavepointRestorePath() {
-               return savepointRestorePath;
-       }
-
-       // 
------------------------------------------------------------------------
-       // Savepoint trigger and reset
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Triggers a new savepoint using the current system time as the 
checkpoint timestamp.
-        */
-       public Future<String> triggerSavepoint(long timestamp) throws Exception 
{
-               final Promise<String> promise = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
-
-               try {
-                       // Get the checkpoint ID up front. If we fail to 
trigger the checkpoint,
-                       // the ID will have changed, but this is OK as long as 
the checkpoint ID
-                       // generates ascending IDs.
-                       final long checkpointId = getAndIncrementCheckpointId();
-
-                       if (checkpointId == -1) {
-                               throw new IllegalStateException("Failed to get 
checkpoint Id");
-                       }
-
-                       LOG.info("Triggering savepoint with ID " + 
checkpointId);
-
-                       // Important: make sure to add the promise to the map 
before calling
-                       // any methods that might trigger callbacks, which 
require the promise.
-                       // Otherwise, the might be race conditions.
-                       if (savepointPromises.put(checkpointId, promise) == 
null) {
-                               boolean success = false;
-
-                               try {
-                                       // All good. The future will be 
completed as soon as the
-                                       // triggered checkpoint is done.
-                                       success = triggerCheckpoint(timestamp, 
checkpointId);
-                               }
-                               finally {
-                                       if (!success) {
-                                               
savepointPromises.remove(checkpointId);
-                                               promise.failure(new 
Exception("Failed to trigger savepoint"));
-                                       }
-                               }
-                       }
-                       else {
-                               throw new IllegalStateException("Duplicate 
checkpoint ID");
-                       }
-               }
-               catch (Throwable t) {
-                       promise.failure(new Exception("Failed to trigger 
savepoint", t));
-               }
-
-               return promise.future();
-       }
-
-       /**
-        * Resets the state of {@link Execution} instances back to the state of 
a savepoint.
-        *
-        * <p>The execution vertices need to be in state {@link 
ExecutionState#CREATED} when calling
-        * this method. The operation might block. Make sure that calls don't 
block the job manager
-        * actor.
-        *
-        * @param tasks         Tasks that will possibly be reset
-        * @param savepointPath The path of the savepoint to rollback to
-        * @throws IllegalStateException If coordinator is shut down
-        * @throws IllegalStateException If mismatch between program and 
savepoint state
-        * @throws Exception             If savepoint store failure
-        */
-       public void restoreSavepoint(
-                       Map<JobVertexID, ExecutionJobVertex> tasks,
-                       String savepointPath) throws Exception {
-
-               checkNotNull(savepointPath, "Savepoint path");
-
-               synchronized (lock) {
-                       if (isShutdown()) {
-                               throw new 
IllegalStateException("CheckpointCoordinator is shut down");
-                       }
-
-                       LOG.info("Rolling back to savepoint '{}'.", 
savepointPath);
-
-                       Savepoint savepoint = 
savepointStore.loadSavepoint(savepointPath);
-
-                       for (TaskState taskState : savepoint.getTaskStates()) {
-                               ExecutionJobVertex executionJobVertex = 
tasks.get(taskState.getJobVertexID());
-
-                               if (executionJobVertex != null) {
-                                       if (executionJobVertex.getParallelism() 
!= taskState.getParallelism()) {
-                                               String msg = 
String.format("Failed to rollback to savepoint %s. " +
-                                                                               
"Parallelism mismatch between savepoint state and new program. " +
-                                                                               
"Cannot map operator %s with parallelism %d to new program with " +
-                                                                               
"parallelism %d. This indicates that the program has been changed " +
-                                                                               
"in a non-compatible way after the savepoint.",
-                                                               savepoint,
-                                                               
taskState.getJobVertexID(),
-                                                               
taskState.getParallelism(),
-                                                               
executionJobVertex.getParallelism());
-
-                                               throw new 
IllegalStateException(msg);
-                                       }
-
-                                       List<Set<Integer>> keyGroupPartitions = 
createKeyGroupPartitions(
-                                                       numberKeyGroups,
-                                                       
executionJobVertex.getParallelism());
-
-                                       for (int i = 0; i < 
executionJobVertex.getTaskVertices().length; i++) {
-                                               SubtaskState subtaskState = 
taskState.getState(i);
-                                               SerializedValue<StateHandle<?>> 
state = null;
-
-                                               if (subtaskState != null) {
-                                                       state = 
subtaskState.getState();
-                                               }
-
-                                               Map<Integer, 
SerializedValue<StateHandle<?>>> kvStateForTaskMap = taskState
-                                                               
.getUnwrappedKvStates(keyGroupPartitions.get(i));
-
-                                               Execution 
currentExecutionAttempt = executionJobVertex
-                                                               
.getTaskVertices()[i]
-                                                               
.getCurrentExecutionAttempt();
-
-                                               
currentExecutionAttempt.setInitialState(state, kvStateForTaskMap);
-                                       }
-                               } else {
-                                       String msg = String.format("Failed to 
rollback to savepoint %s. " +
-                                                                       "Cannot 
map old state for task %s to the new program. " +
-                                                                       "This 
indicates that the program has been changed in a " +
-                                                                       
"non-compatible way  after the savepoint.", savepointPath,
-                                                       
taskState.getJobVertexID());
-                                       throw new IllegalStateException(msg);
-                               }
-                       }
-
-                       // Reset the checkpoint ID counter
-                       long nextCheckpointId = savepoint.getCheckpointId() + 1;
-                       checkpointIdCounter.start();
-                       checkpointIdCounter.setCount(nextCheckpointId);
-                       LOG.info("Reset the checkpoint ID to {}", 
nextCheckpointId);
-
-                       if (savepointRestorePath == null) {
-                               savepointRestorePath = savepointPath;
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       // Checkpoint coordinator callbacks
-       // 
------------------------------------------------------------------------
-
-       @Override
-       protected void onShutdown() {
-               // Fail all outstanding savepoint futures
-               for (Promise<String> promise : savepointPromises.values()) {
-                       promise.failure(new Exception("Checkpoint coordinator 
shutdown"));
-               }
-               savepointPromises.clear();
-       }
-
-       @Override
-       protected void onCancelCheckpoint(long canceledCheckpointId) {
-               LOG.info("Cancelling savepoint with checkpoint ID " + 
canceledCheckpointId);
-               Promise<String> promise = 
savepointPromises.remove(canceledCheckpointId);
-
-               if (promise != null) {
-                       promise.failure(new Exception("Savepoint expired before 
completing"));
-               }
-       }
-
-       @Override
-       protected void onFullyAcknowledgedCheckpoint(CompletedCheckpoint 
checkpoint) {
-               // Sanity check
-               Promise<String> promise = 
savepointPromises.remove(checkpoint.getCheckpointID());
-
-               if (promise == null) {
-                       LOG.warn("Pending savepoint with ID " + 
checkpoint.getCheckpointID() + "  has been " +
-                                       "removed before receiving 
acknowledgment.");
-                       return;
-               }
-
-               // Sanity check
-               if (promise.isCompleted()) {
-                       throw new IllegalStateException("Savepoint promise 
completed");
-               }
-
-               try {
-                       Savepoint savepoint = new SavepointV0(
-                                       checkpoint.getCheckpointID(),
-                                       checkpoint.getTaskStates().values());
-                       String path = savepointStore.storeSavepoint(savepoint);
-                       promise.success(path);
-               }
-               catch (Exception e) {
-                       LOG.warn("Failed to store savepoint.", e);
-                       promise.failure(e);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       // Job status listener
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public ActorGateway createActivatorDeactivator(
-                       ActorSystem actorSystem,
-                       UUID leaderSessionID) {
-
-               synchronized (lock) {
-                       if (isShutdown()) {
-                               throw new IllegalArgumentException("Checkpoint 
coordinator is shut down");
-                       }
-
-                       if (getJobStatusListener() == null) {
-                               Props props = Props.create(
-                                               
SavepointCoordinatorDeActivator.class,
-                                               this,
-                                               leaderSessionID);
-
-                               // wrap the ActorRef in a AkkaActorGateway to 
support message decoration
-                               setJobStatusListener(new AkkaActorGateway(
-                                               actorSystem.actorOf(props),
-                                               leaderSessionID));
-                       }
-
-                       return getJobStatusListener();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       // Completed checkpoints
-       // 
------------------------------------------------------------------------
-
-       private static class IgnoreCheckpointsStore implements 
CompletedCheckpointStore {
-
-               private static final CompletedCheckpointStore INSTANCE = new 
IgnoreCheckpointsStore();
-
-               @Override
-               public void recover() throws Exception {
-               }
-
-               @Override
-               public void addCheckpoint(CompletedCheckpoint checkpoint) 
throws Exception {
-               }
-
-               @Override
-               public CompletedCheckpoint getLatestCheckpoint() throws 
Exception {
-                       return null;
-               }
-
-               @Override
-               public void shutdown() throws Exception {
-               }
-
-               @Override
-               public void suspend() throws Exception {
-               }
-
-               @Override
-               public List<CompletedCheckpoint> getAllCheckpoints() throws 
Exception {
-                       return Collections.emptyList();
-               }
-
-               @Override
-               public int getNumberOfRetainedCheckpoints() {
-                       return 0;
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorDeActivator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorDeActivator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorDeActivator.java
deleted file mode 100644
index 7bd5797..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorDeActivator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.util.Preconditions;
-
-import java.util.UUID;
-
-/**
- * This actor listens to changes in the JobStatus and deactivates the
- * savepoint scheduler and discards all pending checkpoints.
- */
-public class SavepointCoordinatorDeActivator extends FlinkUntypedActor {
-
-       private final CheckpointCoordinator coordinator;
-       private final UUID leaderSessionID;
-
-       public SavepointCoordinatorDeActivator(
-                       SavepointCoordinator coordinator,
-                       UUID leaderSessionID) {
-
-               LOG.info("Create SavepointCoordinatorDeActivator");
-
-               this.coordinator = Preconditions.checkNotNull(coordinator, "The 
checkpointCoordinator must not be null.");
-               this.leaderSessionID = leaderSessionID;
-       }
-
-       @Override
-       public void handleMessage(Object message) throws Exception {
-               if (message instanceof ExecutionGraphMessages.JobStatusChanged) 
{
-                       JobStatus status = 
((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus();
-                       
-                       if (status != JobStatus.RUNNING) {
-                               // anything other than RUNNING should stop the 
trigger for now
-                               coordinator.stopCheckpointScheduler();
-                       }
-               }
-       }
-
-       @Override
-       public UUID getLeaderSessionID() {
-               return leaderSessionID;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
new file mode 100644
index 0000000..0b7b0c2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The SavepointLoader is a utility to load and verify a Savepoint, and to 
create a checkpoint from it. 
+ */
+public class SavepointLoader {
+
+       /**
+        * Loads a savepoint back as a {@link CompletedCheckpoint}.
+        *
+        * <p>This method verifies that tasks and parallelism still match the 
savepoint parameters.
+        *
+        * @param jobId          The JobID of the job to load the savepoint for.
+        * @param tasks          Tasks that will possibly be reset
+        * @param savepointStore The store that holds the savepoint.
+        * @param savepointPath  The path of the savepoint to rollback to
+        *
+        * @throws IllegalStateException If mismatch between program and 
savepoint state
+        * @throws Exception             If savepoint store failure
+        */
+       public static CompletedCheckpoint loadAndValidateSavepoint(
+                       JobID jobId,
+                       Map<JobVertexID, ExecutionJobVertex> tasks,
+                       SavepointStore savepointStore,
+                       String savepointPath) throws Exception {
+
+               // (1) load the savepoint
+               Savepoint savepoint = 
savepointStore.loadSavepoint(checkNotNull(savepointPath));
+               final Map<JobVertexID, TaskState> taskStates = new 
HashMap<>(savepoint.getTaskStates().size());
+               
+               // (2) validate it (parallelism, etc)
+               for (TaskState taskState : savepoint.getTaskStates()) {
+                       ExecutionJobVertex executionJobVertex = 
tasks.get(taskState.getJobVertexID());
+
+                       if (executionJobVertex != null) {
+                               if (executionJobVertex.getParallelism() == 
taskState.getParallelism()) {
+                                       
taskStates.put(taskState.getJobVertexID(), taskState);
+                               }
+                               else {
+                                       String msg = String.format("Failed to 
rollback to savepoint %s. " +
+                                                                       
"Parallelism mismatch between savepoint state and new program. " +
+                                                                       "Cannot 
map operator %s with parallelism %d to new program with " +
+                                                                       
"parallelism %d. This indicates that the program has been changed " +
+                                                                       "in a 
non-compatible way after the savepoint.",
+                                                       savepoint,
+                                                       
taskState.getJobVertexID(),
+                                                       
taskState.getParallelism(),
+                                                       
executionJobVertex.getParallelism());
+
+                                       throw new IllegalStateException(msg);
+                               }
+                       } else {
+                               String msg = String.format("Failed to rollback 
to savepoint %s. " +
+                                                               "Cannot map old 
state for task %s to the new program. " +
+                                                               "This indicates 
that the program has been changed in a " +
+                                                               "non-compatible 
way  after the savepoint.",
+                                               savepointPath, 
taskState.getJobVertexID());
+                               throw new IllegalStateException(msg);
+                       }
+               }
+
+               // (3) convert to checkpoint so the system can fall back to it
+               return new CompletedCheckpoint(jobId, 
savepoint.getCheckpointId(), 0L, 0L, taskStates, false);
+       } 
+
+       // 
------------------------------------------------------------------------
+
+       private SavepointLoader() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index e6ae6ce..82826dd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -215,9 +214,6 @@ public class ExecutionGraph {
        /** The coordinator for checkpoints, if snapshot checkpoints are 
enabled */
        private CheckpointCoordinator checkpointCoordinator;
 
-       /** The coordinator for savepoints, if snapshot checkpoints are enabled 
*/
-       private transient SavepointCoordinator savepointCoordinator;
-
        /** Checkpoint stats tracker separate from the coordinator in order to 
be
         * available after archiving. */
        private CheckpointStatsTracker checkpointStatsTracker;
@@ -226,7 +222,7 @@ public class ExecutionGraph {
        private ExecutionContext executionContext;
 
        /** Registered KvState instances reported by the TaskManagers. */
-       private transient KvStateLocationRegistry kvStateLocationRegistry;
+       private KvStateLocationRegistry kvStateLocationRegistry;
 
        // ------ Fields that are only relevant for archived execution graphs 
------------
        private String jsonPlan;
@@ -392,6 +388,7 @@ public class ExecutionGraph {
                                userClassLoader,
                                checkpointIDCounter,
                                checkpointStore,
+                               savepointStore,
                                recoveryMode,
                                checkpointStatsTracker);
 
@@ -399,25 +396,6 @@ public class ExecutionGraph {
                // job status changes (running -> on, all other states -> off)
                registerJobStatusListener(
                                
checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
-
-               // Savepoint Coordinator
-               savepointCoordinator = new SavepointCoordinator(
-                               jobID,
-                               interval,
-                               checkpointTimeout,
-                               numberKeyGroups,
-                               tasksToTrigger,
-                               tasksToWaitFor,
-                               tasksToCommitTo,
-                               userClassLoader,
-                               // Important: this counter needs to be shared 
with the periodic
-                               // checkpoint coordinator.
-                               checkpointIDCounter,
-                               savepointStore,
-                               checkpointStatsTracker);
-
-               registerJobStatusListener(savepointCoordinator
-                               .createActivatorDeactivator(actorSystem, 
leaderSessionID));
        }
 
        /**
@@ -432,25 +410,16 @@ public class ExecutionGraph {
                }
 
                if (checkpointCoordinator != null) {
-                       checkpointCoordinator.shutdown();
+                       checkpointCoordinator.suspend();
                        checkpointCoordinator = null;
                        checkpointStatsTracker = null;
                }
-
-               if (savepointCoordinator != null) {
-                       savepointCoordinator.shutdown();
-                       savepointCoordinator = null;
-               }
        }
 
        public CheckpointCoordinator getCheckpointCoordinator() {
                return checkpointCoordinator;
        }
 
-       public SavepointCoordinator getSavepointCoordinator() {
-               return savepointCoordinator;
-       }
-
        public KvStateLocationRegistry getKvStateLocationRegistry() {
                return kvStateLocationRegistry;
        }
@@ -924,17 +893,7 @@ public class ExecutionGraph {
 
                                // if we have checkpointed state, reload it 
into the executions
                                if (checkpointCoordinator != null) {
-                                       boolean restored = checkpointCoordinator
-                                                       
.restoreLatestCheckpointedState(getAllVertices(), false, false);
-
-                                       // TODO(uce) Temporary work around to 
restore initial state on
-                                       // failure during recovery. Will be 
superseded by FLINK-3397.
-                                       if (!restored && savepointCoordinator 
!= null) {
-                                               String savepointPath = 
savepointCoordinator.getSavepointRestorePath();
-                                               if (savepointPath != null) {
-                                                       
savepointCoordinator.restoreSavepoint(getAllVertices(), savepointPath);
-                                               }
-                                       }
+                                       
checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, 
false);
                                }
                        }
 
@@ -961,33 +920,6 @@ public class ExecutionGraph {
        }
 
        /**
-        * Restores the execution state back to a savepoint.
-        *
-        * <p>The execution vertices need to be in state {@link 
ExecutionState#CREATED} when calling
-        * this method. The operation might block. Make sure that calls don't 
block the job manager
-        * actor.
-        *
-        * @param savepointPath The path of the savepoint to rollback to.
-        * @throws IllegalStateException If checkpointing is disabled
-        * @throws IllegalStateException If checkpoint coordinator is shut down
-        * @throws Exception If failure during rollback
-        */
-       public void restoreSavepoint(String savepointPath) throws Exception {
-               synchronized (progressLock) {
-                       if (savepointCoordinator != null) {
-                               LOG.info("Restoring savepoint: " + 
savepointPath + ".");
-
-                               savepointCoordinator.restoreSavepoint(
-                                               getAllVertices(), 
savepointPath);
-                       }
-                       else {
-                               // Sanity check
-                               throw new IllegalStateException("Checkpointing 
disabled.");
-                       }
-               }
-       }
-
-       /**
         * This method cleans fields that are irrelevant for the archived 
execution attempt.
         */
        public void prepareForArchiving() {
@@ -1000,6 +932,7 @@ public class ExecutionGraph {
                scheduler = null;
                checkpointCoordinator = null;
                executionContext = null;
+               kvStateLocationRegistry = null;
 
                for (ExecutionJobVertex vertex : verticesInCreationOrder) {
                        vertex.prepareForArchiving();
@@ -1131,21 +1064,6 @@ public class ExecutionGraph {
                } catch (Exception e) {
                        LOG.error("Error while cleaning up after execution", e);
                }
-
-               try {
-                       CheckpointCoordinator coord = this.savepointCoordinator;
-                       this.savepointCoordinator = null;
-
-                       if (coord != null) {
-                               if (state.isGloballyTerminalState()) {
-                                       coord.shutdown();
-                               } else {
-                                       coord.suspend();
-                               }
-                       }
-               } catch (Exception e) {
-                       LOG.error("Error while cleaning up after execution", e);
-               }
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 01af3c1..9fb01bf 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -41,7 +41,7 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint._
-import org.apache.flink.runtime.checkpoint.savepoint.{SavepointStoreFactory, 
SavepointStore}
+import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, 
SavepointStoreFactory, SavepointStore}
 import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, 
SimpleCheckpointStatsTracker, DisabledCheckpointStatsTracker}
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.execution.SuppressRestartsException
@@ -290,7 +290,7 @@ class JobManager(
 
     // failsafe shutdown of the metrics registry
     try {
-      metricsRegistry.map(_.shutdown())
+      metricsRegistry.foreach(_.shutdown())
     } catch {
       case t: Exception => log.error("MetricRegistry did not shutdown 
properly.", t)
     }
@@ -686,9 +686,9 @@ class JobManager(
     case TriggerSavepoint(jobId) =>
       currentJobs.get(jobId) match {
         case Some((graph, _)) =>
-          val savepointCoordinator = graph.getSavepointCoordinator()
+          val checkpointCoordinator = graph.getCheckpointCoordinator()
 
-          if (savepointCoordinator != null) {
+          if (checkpointCoordinator != null) {
             // Immutable copy for the future
             val senderRef = sender()
 
@@ -696,7 +696,7 @@ class JobManager(
               try {
                 // Do this async, because checkpoint coordinator operations can
                 // contain blocking calls to the state backend or ZooKeeper.
-                val savepointFuture = savepointCoordinator.triggerSavepoint(
+                val savepointFuture = checkpointCoordinator.triggerSavepoint(
                   System.currentTimeMillis())
 
                 savepointFuture.onComplete {
@@ -1294,17 +1294,37 @@ class JobManager(
       // because it is a blocking operation
       future {
         try {
+
           if (isRecovery) {
+            // this is a recovery of a master failure (this master takes over)
             executionGraph.restoreLatestCheckpointedState()
-          } else {
+          }
+          else {
+            // load a savepoint only if this is not starting from a newer 
checkpoint
+            // as part of an master failure recovery
             val snapshotSettings = jobGraph.getSnapshotSettings
             if (snapshotSettings != null) {
               val savepointPath = snapshotSettings.getSavepointPath()
-
-              // Reset state back to savepoint
+              
               if (savepointPath != null) {
+                // got a savepoint
                 try {
-                  executionGraph.restoreSavepoint(savepointPath)
+                  log.info(s"Starting job from savepoint '$savepointPath'.")
+
+                  // load the savepoint as a checkpoint into the system
+                  val savepoint: CompletedCheckpoint = 
SavepointLoader.loadAndValidateSavepoint(
+                    jobId, executionGraph.getAllVertices, savepointStore, 
savepointPath)
+
+                  executionGraph.getCheckpointCoordinator.getCheckpointStore
+                      .addCheckpoint(savepoint)
+                  
+                  // Reset the checkpoint ID counter
+                  val nextCheckpointId: Long = savepoint.getCheckpointID + 1
+                  log.info(s"Reset the checkpoint ID to $nextCheckpointId")
+                  
executionGraph.getCheckpointCoordinator.getCheckpointIdCounter
+                      .setCount(nextCheckpointId)
+                  
+                  executionGraph.restoreLatestCheckpointedState()
                 } catch {
                   case e: Exception =>
                     throw new SuppressRestartsException(e)
@@ -1367,21 +1387,13 @@ class JobManager(
         currentJobs.get(jid) match {
           case Some((graph, _)) =>
             val checkpointCoordinator = graph.getCheckpointCoordinator()
-            val savepointCoordinator = graph.getSavepointCoordinator()
 
-            if (checkpointCoordinator != null && savepointCoordinator != null) 
{
+            if (checkpointCoordinator != null) {
               future {
                 try {
-                  if 
(checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
-                    // OK, this is the common case
-                  }
-                  else {
-                    // Try the savepoint coordinator if the message was not 
addressed
-                    // to the periodic checkpoint coordinator.
-                    if 
(!savepointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
-                      log.info("Received message for non-existing checkpoint " 
+
-                        ackMessage.getCheckpointId)
-                    }
+                  if 
(!checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
+                    log.info("Received message for non-existing checkpoint " +
+                      ackMessage.getCheckpointId)
                   }
                 }
                 catch {
@@ -1404,21 +1416,13 @@ class JobManager(
         currentJobs.get(jid) match {
           case Some((graph, _)) =>
             val checkpointCoordinator = graph.getCheckpointCoordinator()
-            val savepointCoordinator = graph.getSavepointCoordinator()
 
-            if (checkpointCoordinator != null && savepointCoordinator != null) 
{
+            if (checkpointCoordinator != null) {
               future {
                 try {
                   if 
(checkpointCoordinator.receiveDeclineMessage(declineMessage)) {
-                    // OK, this is the common case
-                  }
-                  else {
-                    // Try the savepoint coordinator if the message was not 
addressed
-                    // to the periodic checkpoint coordinator.
-                    if 
(!savepointCoordinator.receiveDeclineMessage(declineMessage)) {
-                      log.info("Received message for non-existing checkpoint " 
+
-                        declineMessage.getCheckpointId)
-                    }
+                    log.info("Received message for non-existing checkpoint " +
+                      declineMessage.getCheckpointId)
                   }
                 }
                 catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
new file mode 100644
index 0000000..5772fae
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointPropertiesTest {
+
+       @Test
+       public void testCheckpointProperties() {
+               
assertFalse(CheckpointProperties.forStandardCheckpoint().isSavepoint());
+       }
+
+       @Test
+       public void testSavepointProperties() {
+               
assertTrue(CheckpointProperties.forStandardSavepoint().isSavepoint());
+       }
+}

Reply via email to