http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 289f5c3..baa0e08 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.CheckpointMessagesTest;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -46,19 +46,16 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
         * Creates the {@link CompletedCheckpointStore} implementation to be 
tested.
         */
        protected abstract CompletedCheckpointStore createCompletedCheckpoints(
-                       int maxNumberOfCheckpointsToRetain, ClassLoader 
userLoader) throws Exception;
+                       int maxNumberOfCheckpointsToRetain) throws Exception;
 
        // 
---------------------------------------------------------------------------------------------
 
-       // Verify that discarded checkpoints are called with the correct class 
loader
-       private final ClassLoader userClassLoader = 
ClassLoader.getSystemClassLoader();
-
        /**
         * Tests that at least one checkpoint needs to be retained.
         */
        @Test(expected = Exception.class)
        public void testExceptionOnNoRetainedCheckpoints() throws Exception {
-               createCompletedCheckpoints(0, userClassLoader);
+               createCompletedCheckpoints(0);
        }
 
        /**
@@ -66,7 +63,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
         */
        @Test
        public void testAddAndGetLatestCheckpoint() throws Exception {
-               CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(4, userClassLoader);
+               CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(4);
 
                // Empty state
                assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
@@ -91,7 +88,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
         */
        @Test
        public void testAddCheckpointMoreThanMaxRetained() throws Exception {
-               CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(1, userClassLoader);
+               CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(1);
 
                TestCompletedCheckpoint[] expected = new 
TestCompletedCheckpoint[] {
                                createCheckpoint(0), createCheckpoint(1),
@@ -122,7 +119,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
         */
        @Test
        public void testEmptyState() throws Exception {
-               CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(1, userClassLoader);
+               CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(1);
 
                assertNull(checkpoints.getLatestCheckpoint());
                assertEquals(0, checkpoints.getAllCheckpoints().size());
@@ -134,7 +131,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
         */
        @Test
        public void testGetAllCheckpoints() throws Exception {
-               CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(4, userClassLoader);
+               CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(4);
 
                TestCompletedCheckpoint[] expected = new 
TestCompletedCheckpoint[] {
                                createCheckpoint(0), createCheckpoint(1),
@@ -159,7 +156,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
         */
        @Test
        public void testDiscardAllCheckpoints() throws Exception {
-               CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(4, userClassLoader);
+               CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(4);
 
                TestCompletedCheckpoint[] expected = new 
TestCompletedCheckpoint[] {
                                createCheckpoint(0), createCheckpoint(1),
@@ -170,7 +167,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                        checkpoints.addCheckpoint(checkpoint);
                }
 
-               checkpoints.shutdown();
+               checkpoints.shutdown(JobStatus.FINISHED);
 
                // Empty state
                assertNull(checkpoints.getLatestCheckpoint());
@@ -193,6 +190,11 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
 
        protected TestCompletedCheckpoint createCheckpoint(int id, int 
numberOfStates)
                        throws IOException {
+               return createCheckpoint(id, numberOfStates, 
CheckpointProperties.forStandardCheckpoint());
+       }
+
+       protected TestCompletedCheckpoint createCheckpoint(int id, int 
numberOfStates, CheckpointProperties props)
+                       throws IOException {
 
                JobVertexID jvid = new JobVertexID();
 
@@ -207,7 +209,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                        taskState.putState(i, new SubtaskState(stateHandle, 0));
                }
 
-               return new TestCompletedCheckpoint(new JobID(), id, 0, 
taskGroupStates);
+               return new TestCompletedCheckpoint(new JobID(), id, 0, 
taskGroupStates, props);
        }
 
        private void verifyCheckpoint(CompletedCheckpoint expected, 
CompletedCheckpoint actual) {
@@ -232,15 +234,33 @@ public abstract class CompletedCheckpointStoreTest 
extends TestLogger {
                        JobID jobId,
                        long checkpointId,
                        long timestamp,
-                       Map<JobVertexID, TaskState> taskGroupStates) {
+                       Map<JobVertexID, TaskState> taskGroupStates,
+                       CheckpointProperties props) {
 
-                       super(jobId, checkpointId, timestamp, Long.MAX_VALUE, 
taskGroupStates, true);
+                       super(jobId, checkpointId, timestamp, Long.MAX_VALUE, 
taskGroupStates, props, null);
                }
 
                @Override
-               public void discardState() throws Exception {
-                       super.discardState();
+               public boolean subsume() throws Exception {
+                       if (super.subsume()) {
+                               discard();
+                               return true;
+                       } else {
+                               return false;
+                       }
+               }
+
+               @Override
+               public boolean discard(JobStatus jobStatus) throws Exception {
+                       if (super.discard(jobStatus)) {
+                               discard();
+                               return true;
+                       } else {
+                               return false;
+                       }
+               }
 
+               private void discard() {
                        if (!isDiscarded) {
                                this.isDiscarded = true;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 9b04244..25a4703 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -19,40 +19,103 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Rule;
 import org.junit.Test;
-import org.mockito.Matchers;
+import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
+import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 public class CompletedCheckpointTest {
 
+       @Rule
+       public TemporaryFolder tmpFolder = new TemporaryFolder();
+
        /**
-        * Tests that the `deleteStateWhenDisposed` flag is correctly forwarded.
+        * Tests that persistent checkpoints discard their header file.
         */
        @Test
        public void testDiscard() throws Exception {
+               File file = tmpFolder.newFile();
+               assertEquals(true, file.exists());
+
                TaskState state = mock(TaskState.class);
                Map<JobVertexID, TaskState> taskStates = new HashMap<>();
                taskStates.put(new JobVertexID(), state);
 
                // Verify discard call is forwarded to state
-               CompletedCheckpoint checkpoint = new CompletedCheckpoint(new 
JobID(), 0, 0, 1, taskStates, true);
-               checkpoint.discardState();
+               CompletedCheckpoint checkpoint = new CompletedCheckpoint(
+                               new JobID(), 0, 0, 1, taskStates, 
CheckpointProperties.forStandardCheckpoint(), file.getAbsolutePath());
+
+               checkpoint.discard(JobStatus.FAILED);
+
+               assertEquals(false, file.exists());
+       }
+
+       /**
+        * Tests that the garbage collection properties are respected when 
subsuming checkpoints.
+        */
+       @Test
+       public void testCleanUpOnSubsume() throws Exception {
+               TaskState state = mock(TaskState.class);
+               Map<JobVertexID, TaskState> taskStates = new HashMap<>();
+               taskStates.put(new JobVertexID(), state);
+
+               boolean discardSubsumed = true;
+               CheckpointProperties props = new CheckpointProperties(false, 
false, discardSubsumed, true, true, true, true);
+               CompletedCheckpoint checkpoint = new CompletedCheckpoint(
+                               new JobID(), 0, 0, 1, taskStates, props, null);
+
+               // Subsume
+               checkpoint.subsume();
+
                verify(state, times(1)).discardState();
+       }
+
+       /**
+        * Tests that the garbage collection properties are respected when 
shutting down.
+        */
+       @Test
+       public void testCleanUpOnShutdown() throws Exception {
+               File file = tmpFolder.newFile();
+               String externalPath = file.getAbsolutePath();
+
+               JobStatus[] terminalStates = new JobStatus[] {
+                               JobStatus.FINISHED, JobStatus.CANCELED, 
JobStatus.FAILED, JobStatus.SUSPENDED
+               };
+
+               TaskState state = mock(TaskState.class);
+               Map<JobVertexID, TaskState> taskStates = new HashMap<>();
+               taskStates.put(new JobVertexID(), state);
+
+               for (JobStatus status : terminalStates) {
+                       Mockito.reset(state);
+
+                       // Keep
+                       CheckpointProperties props = new 
CheckpointProperties(false, true, false, false, false, false, false);
+                       CompletedCheckpoint checkpoint = new 
CompletedCheckpoint(
+                                       new JobID(), 0, 0, 1, new 
HashMap<>(taskStates), props, externalPath);
+
+                       checkpoint.discard(status);
+                       verify(state, times(0)).discardState();
+                       assertEquals(true, file.exists());
 
-               Mockito.reset(state);
+                       // Discard
+                       props = new CheckpointProperties(false, false, true, 
true, true, true, true);
+                       checkpoint = new CompletedCheckpoint(
+                                       new JobID(), 0, 0, 1, new 
HashMap<>(taskStates), props, null);
 
-               // Verify discard call is not forwarded to state
-               checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, 
taskStates, false);
-               checkpoint.discardState();
-               verify(state, times(0)).discardState();
+                       checkpoint.discard(status);
+                       verify(state, times(1)).discardState();
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index c43cf2e..ea4d322 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -23,17 +23,16 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-
 import org.junit.Test;
-
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -42,7 +41,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class CoordinatorShutdownTest {
        
@@ -62,7 +63,7 @@ public class CoordinatorShutdownTest {
                        
                        JobGraph testGraph = new JobGraph("test job", vertex);
                        testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 
-                                       5000, 60000, 0L, Integer.MAX_VALUE));
+                                       5000, 60000, 0L, Integer.MAX_VALUE, 
ExternalizedCheckpointSettings.none()));
                        
                        ActorGateway jmGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
@@ -114,7 +115,7 @@ public class CoordinatorShutdownTest {
 
                        JobGraph testGraph = new JobGraph("test job", vertex);
                        testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
-                                       5000, 60000, 0L, Integer.MAX_VALUE));
+                                       5000, 60000, 0L, Integer.MAX_VALUE, 
ExternalizedCheckpointSettings.none()));
                        
                        ActorGateway jmGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 1de7098..a8bed46 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -26,16 +26,18 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
 import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.AfterClass;
 import org.junit.Test;
+import org.mockito.Matchers;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.net.URL;
@@ -67,8 +69,8 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                ExecutionGraph graph = 
createExecutionGraphAndEnableCheckpointing(counter, store);
                graph.fail(new Exception("Test Exception"));
 
-               verify(counter, times(1)).shutdown();
-               verify(store, times(1)).shutdown();
+               verify(counter, times(1)).shutdown(JobStatus.FAILED);
+               verify(store, times(1)).shutdown(JobStatus.FAILED);
        }
 
        /**
@@ -84,11 +86,8 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                graph.suspend(new Exception("Test Exception"));
 
                // No shutdown
-               verify(counter, times(0)).shutdown();
-               verify(store, times(0)).shutdown();
-
-               verify(counter, times(1)).suspend();
-               verify(store, times(1)).suspend();
+               verify(counter, 
times(1)).shutdown(Matchers.eq(JobStatus.SUSPENDED));
+               verify(store, 
times(1)).shutdown(Matchers.eq(JobStatus.SUSPENDED));
        }
 
        private ExecutionGraph createExecutionGraphAndEnableCheckpointing(
@@ -112,12 +111,13 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                                100,
                                100,
                                1,
+                               ExternalizedCheckpointSettings.none(),
                                Collections.<ExecutionJobVertex>emptyList(),
                                Collections.<ExecutionJobVertex>emptyList(),
                                Collections.<ExecutionJobVertex>emptyList(),
                                counter,
                                store,
-                               new HeapSavepointStore(),
+                               null,
                                new DisabledCheckpointStatsTracker());
 
                JobVertex jobVertex = new JobVertex("MockVertex");

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index b8126e9..2667743 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -19,24 +19,33 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Rule;
 import org.junit.Test;
-import org.mockito.Matchers;
+import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
+import java.io.File;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 public class PendingCheckpointTest {
 
+       @Rule
+       public TemporaryFolder tmpFolder = new TemporaryFolder();
+
        private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS 
= new HashMap<>();
        private static final ExecutionAttemptID ATTEMPT_ID = new 
ExecutionAttemptID();
 
@@ -45,24 +54,125 @@ public class PendingCheckpointTest {
        }
 
        /**
-        * Tests that pending checkpoints can be subsumed.
+        * Tests that pending checkpoints can be subsumed iff they are forced.
         */
        @Test
        public void testCanBeSubsumed() throws Exception {
-               PendingCheckpoint pending = createPendingCheckpoint();
+               // Forced checkpoints cannot be subsumed
+               CheckpointProperties forced = new CheckpointProperties(true, 
true, false, false, false, false, false);
+               PendingCheckpoint pending = createPendingCheckpoint(forced, 
"ignored");
+               assertFalse(pending.canBeSubsumed());
+
+               try {
+                       pending.abortSubsumed();
+                       fail("Did not throw expected Exception");
+               } catch (IllegalStateException ignored) {
+                       // Expected
+               }
+
+               // Non-forced checkpoints can be subsumed
+               CheckpointProperties subsumed = new CheckpointProperties(false, 
true, false, false, false, false, false);
+               pending = createPendingCheckpoint(subsumed, "ignored");
                assertTrue(pending.canBeSubsumed());
        }
 
        /**
+        * Tests that the persist checkpoint property is respected by the 
pending
+        * checkpoint when finalizing.
+        */
+       @Test
+       public void testPersistExternally() throws Exception {
+               File tmp = tmpFolder.newFolder();
+
+               // Persisted checkpoint
+               CheckpointProperties persisted = new 
CheckpointProperties(false, true, false, false, false, false, false);
+
+               PendingCheckpoint pending = createPendingCheckpoint(persisted, 
tmp.getAbsolutePath());
+               pending.acknowledgeTask(ATTEMPT_ID, null);
+
+               assertEquals(0, tmp.listFiles().length);
+               pending.finalizeCheckpoint();
+               assertEquals(1, tmp.listFiles().length);
+
+               // Ephemeral checkpoint
+               CheckpointProperties ephemeral = new 
CheckpointProperties(false, false, true, true, true, true, true);
+               pending = createPendingCheckpoint(ephemeral, null);
+               pending.acknowledgeTask(ATTEMPT_ID, null);
+
+               assertEquals(1, tmp.listFiles().length);
+               pending.finalizeCheckpoint();
+               assertEquals(1, tmp.listFiles().length);
+       }
+
+       /**
+        * Tests that the completion future is succeeded on finalize and failed 
on
+        * abort and failures during finalize.
+        */
+       @Test
+       public void testCompletionFuture() throws Exception {
+               CheckpointProperties props = new CheckpointProperties(false, 
true, false, false, false, false, false);
+
+               // Abort declined
+               PendingCheckpoint pending = createPendingCheckpoint(props, 
"ignored");
+               Future<CompletedCheckpoint> future = 
pending.getCompletionFuture();
+
+               assertFalse(future.isDone());
+               pending.abortDeclined();
+               assertTrue(future.isDone());
+
+               // Abort expired
+               pending = createPendingCheckpoint(props, "ignored");
+               future = pending.getCompletionFuture();
+
+               assertFalse(future.isDone());
+               pending.abortExpired();
+               assertTrue(future.isDone());
+
+               // Abort subsumed
+               pending = createPendingCheckpoint(props, "ignored");
+               future = pending.getCompletionFuture();
+
+               assertFalse(future.isDone());
+               pending.abortSubsumed();
+               assertTrue(future.isDone());
+
+               // Finalize (all ACK'd)
+               String target = tmpFolder.newFolder().getAbsolutePath();
+               pending = createPendingCheckpoint(props, target);
+               future = pending.getCompletionFuture();
+
+               assertFalse(future.isDone());
+               pending.acknowledgeTask(ATTEMPT_ID, null);
+               pending.finalizeCheckpoint();
+               assertTrue(future.isDone());
+
+               // Finalize (missing ACKs)
+               pending = createPendingCheckpoint(props, "ignored");
+               future = pending.getCompletionFuture();
+
+               assertFalse(future.isDone());
+               try {
+                       pending.finalizeCheckpoint();
+                       fail("Did not throw expected Exception");
+               } catch (IllegalStateException ignored) {
+                       // Expected
+               }
+               assertTrue(future.isDone());
+       }
+
+       /**
         * Tests that abort discards state.
         */
        @Test
        @SuppressWarnings("unchecked")
-       public void testAbort() throws Exception {
+       public void testAbortDiscardsState() throws Exception {
+               CheckpointProperties props = new CheckpointProperties(false, 
true, false, false, false, false, false);
                TaskState state = mock(TaskState.class);
 
+               String targetDir = tmpFolder.newFolder().getAbsolutePath();
+
                // Abort declined
-               PendingCheckpoint pending = createPendingCheckpoint();
+               PendingCheckpoint pending = createPendingCheckpoint(props, 
targetDir);
                setTaskState(pending, state);
 
                pending.abortDeclined();
@@ -71,7 +181,7 @@ public class PendingCheckpointTest {
                // Abort error
                Mockito.reset(state);
 
-               pending = createPendingCheckpoint();
+               pending = createPendingCheckpoint(props, targetDir);
                setTaskState(pending, state);
 
                pending.abortError(new Exception("Expected Test Exception"));
@@ -80,7 +190,7 @@ public class PendingCheckpointTest {
                // Abort expired
                Mockito.reset(state);
 
-               pending = createPendingCheckpoint();
+               pending = createPendingCheckpoint(props, targetDir);
                setTaskState(pending, state);
 
                pending.abortExpired();
@@ -89,37 +199,18 @@ public class PendingCheckpointTest {
                // Abort subsumed
                Mockito.reset(state);
 
-               pending = createPendingCheckpoint();
+               pending = createPendingCheckpoint(props, targetDir);
                setTaskState(pending, state);
 
                pending.abortSubsumed();
                verify(state, times(1)).discardState();
        }
 
-       /**
-        * Tests that the CompletedCheckpoint `deleteStateWhenDisposed` flag is
-        * correctly set to true.
-        */
-       @Test
-       public void testFinalizeCheckpoint() throws Exception {
-               TaskState state = mock(TaskState.class);
-               PendingCheckpoint pending = createPendingCheckpoint();
-               PendingCheckpointTest.setTaskState(pending, state);
-
-               pending.acknowledgeTask(ATTEMPT_ID, null);
-
-               CompletedCheckpoint checkpoint = pending.finalizeCheckpoint();
-
-               // Does discard state
-               checkpoint.discardState();
-               verify(state, times(1)).discardState();
-       }
-
        // 
------------------------------------------------------------------------
 
-       private static PendingCheckpoint createPendingCheckpoint() {
+       private static PendingCheckpoint 
createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {
                Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new 
HashMap<>(ACK_TASKS);
-               return new PendingCheckpoint(new JobID(), 0, 1, ackTasks);
+               return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, 
props, targetDirectory);
        }
 
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
deleted file mode 100644
index 3701359..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-public class PendingSavepointTest {
-
-       private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS 
= new HashMap<>();
-       private static final ExecutionAttemptID ATTEMPT_ID = new 
ExecutionAttemptID();
-
-       static {
-               ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class));
-       }
-
-       /**
-        * Tests that pending savepoints cannot be subsumed.
-        */
-       @Test
-       public void testCanBeSubsumed() throws Exception {
-               PendingSavepoint pending = createPendingSavepoint();
-               assertFalse(pending.canBeSubsumed());
-       }
-
-       /**
-        * Tests that abort discards state fails the completeion future.
-        */
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testAbort() throws Exception {
-               TaskState state = mock(TaskState.class);
-
-               // Abort declined
-               PendingSavepoint pending = createPendingSavepoint();
-               PendingCheckpointTest.setTaskState(pending, state);
-
-               pending.abortDeclined();
-               verify(state, times(1)).discardState();
-
-               // Abort error
-               Mockito.reset(state);
-
-               pending = createPendingSavepoint();
-               PendingCheckpointTest.setTaskState(pending, state);
-               Future<String> future = pending.getCompletionFuture();
-
-               pending.abortError(new Exception("Expected Test Exception"));
-               verify(state, times(1)).discardState();
-               assertTrue(future.failed().isCompleted());
-
-               // Abort expired
-               Mockito.reset(state);
-
-               pending = createPendingSavepoint();
-               PendingCheckpointTest.setTaskState(pending, state);
-               future = pending.getCompletionFuture();
-
-               pending.abortExpired();
-               verify(state, times(1)).discardState();
-               assertTrue(future.failed().isCompleted());
-
-               // Abort subsumed
-               pending = createPendingSavepoint();
-
-               try {
-                       pending.abortSubsumed();
-                       fail("Did not throw expected Exception");
-               } catch (Throwable ignored) { // expected
-               }
-       }
-
-       /**
-        * Tests that the CompletedCheckpoint `deleteStateWhenDisposed` flag is
-        * correctly set to false.
-        */
-       @Test
-       public void testFinalizeCheckpoint() throws Exception {
-               TaskState state = mock(TaskState.class);
-               PendingSavepoint pending = createPendingSavepoint();
-               PendingCheckpointTest.setTaskState(pending, state);
-
-               Future<String> future = pending.getCompletionFuture();
-
-               pending.acknowledgeTask(ATTEMPT_ID, null);
-
-               CompletedCheckpoint checkpoint = pending.finalizeCheckpoint();
-
-               // Does _NOT_ discard state
-               checkpoint.discardState();
-               verify(state, times(0)).discardState();
-
-               // Future is completed
-               String path = Await.result(future, Duration.Zero());
-               assertNotNull(path);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static PendingSavepoint createPendingSavepoint() {
-               Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new 
HashMap<>(ACK_TASKS);
-               return new PendingSavepoint(new JobID(), 0, 1, ackTasks, new 
HeapSavepointStore());
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index 59fa0e2..4e9366e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -30,10 +31,9 @@ public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointS
 
        @Override
        protected CompletedCheckpointStore createCompletedCheckpoints(
-                       int maxNumberOfCheckpointsToRetain,
-                       ClassLoader userClassLoader) throws Exception {
+                       int maxNumberOfCheckpointsToRetain) throws Exception {
 
-               return new 
StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, 
userClassLoader);
+               return new 
StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
        }
 
        /**
@@ -41,13 +41,13 @@ public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointS
         */
        @Test
        public void testShutdownDiscardsCheckpoints() throws Exception {
-               CompletedCheckpointStore store = createCompletedCheckpoints(1, 
ClassLoader.getSystemClassLoader());
+               CompletedCheckpointStore store = createCompletedCheckpoints(1);
                TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
                store.addCheckpoint(checkpoint);
                assertEquals(1, store.getNumberOfRetainedCheckpoints());
 
-               store.shutdown();
+               store.shutdown(JobStatus.FINISHED);
 
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
                assertTrue(checkpoint.isDiscarded());
@@ -59,13 +59,13 @@ public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointS
         */
        @Test
        public void testSuspendDiscardsCheckpoints() throws Exception {
-               CompletedCheckpointStore store = createCompletedCheckpoints(1, 
ClassLoader.getSystemClassLoader());
+               CompletedCheckpointStore store = createCompletedCheckpoints(1);
                TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
                store.addCheckpoint(checkpoint);
                assertEquals(1, store.getNumberOfRetainedCheckpoints());
 
-               store.suspend();
+               store.shutdown(JobStatus.SUSPENDED);
 
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
                assertTrue(checkpoint.isDiscarded());

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 9fbe574..2e44ecd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
@@ -59,9 +60,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends 
CompletedCheckpoint
 
        @Override
        protected CompletedCheckpointStore createCompletedCheckpoints(
-                       int maxNumberOfCheckpointsToRetain, ClassLoader 
userLoader) throws Exception {
+                       int maxNumberOfCheckpointsToRetain) throws Exception {
 
-               return new 
ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader,
+               return new 
ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain,
                        ZooKeeper.createClient(), CheckpointsPath, new 
RetrievableStateStorageHelper<CompletedCheckpoint>() {
                        @Override
                        public RetrievableStateHandle<CompletedCheckpoint> 
store(CompletedCheckpoint state) throws Exception {
@@ -77,8 +78,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends 
CompletedCheckpoint
         */
        @Test
        public void testRecover() throws Exception {
-               CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(3, ClassLoader
-                               .getSystemClassLoader());
+               CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(3);
 
                TestCompletedCheckpoint[] expected = new 
TestCompletedCheckpoint[] {
                                createCheckpoint(0), createCheckpoint(1), 
createCheckpoint(2)
@@ -118,14 +118,14 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
        public void testShutdownDiscardsCheckpoints() throws Exception {
                CuratorFramework client = ZooKeeper.getClient();
 
-               CompletedCheckpointStore store = createCompletedCheckpoints(1, 
ClassLoader.getSystemClassLoader());
+               CompletedCheckpointStore store = createCompletedCheckpoints(1);
                TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
                store.addCheckpoint(checkpoint);
                assertEquals(1, store.getNumberOfRetainedCheckpoints());
                assertNotNull(client.checkExists().forPath(CheckpointsPath + 
"/" + checkpoint.getCheckpointID()));
 
-               store.shutdown();
+               store.shutdown(JobStatus.FINISHED);
 
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
                assertNull(client.checkExists().forPath(CheckpointsPath + "/" + 
checkpoint.getCheckpointID()));
@@ -143,14 +143,14 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
        public void testSuspendKeepsCheckpoints() throws Exception {
                CuratorFramework client = ZooKeeper.getClient();
 
-               CompletedCheckpointStore store = createCompletedCheckpoints(1, 
ClassLoader.getSystemClassLoader());
+               CompletedCheckpointStore store = createCompletedCheckpoints(1);
                TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
                store.addCheckpoint(checkpoint);
                assertEquals(1, store.getNumberOfRetainedCheckpoints());
                assertNotNull(client.checkExists().forPath(CheckpointsPath + 
"/" + checkpoint.getCheckpointID()));
 
-               store.suspend();
+               store.shutdown(JobStatus.SUSPENDED);
 
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
                assertNotNull(client.checkExists().forPath(CheckpointsPath + 
"/" + checkpoint.getCheckpointID()));

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
deleted file mode 100644
index 3e2de80..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.Matchers;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-
-public class FsSavepointStoreTest {
-
-       @Rule
-       public TemporaryFolder tmp = new TemporaryFolder();
-
-       /**
-        * Tests a store-load-dispose sequence.
-        */
-       @Test
-       public void testStoreLoadDispose() throws Exception {
-               FsSavepointStore store = new 
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
-               assertEquals(0, tmp.getRoot().listFiles().length);
-
-               // Store
-               SavepointV1 stored = new SavepointV1(1929292, 
SavepointV1Test.createTaskStates(4, 24));
-               String path = store.storeSavepoint(stored);
-               assertEquals(1, tmp.getRoot().listFiles().length);
-
-               // Load
-               Savepoint loaded = store.loadSavepoint(path);
-               assertEquals(stored, loaded);
-
-               // Dispose
-               store.disposeSavepoint(path);
-
-               assertEquals(0, tmp.getRoot().listFiles().length);
-       }
-
-       /**
-        * Tests loading with unexpected magic number.
-        */
-       @Test
-       public void testUnexpectedSavepoint() throws Exception {
-               FsSavepointStore store = new 
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
-
-               // Random file
-               Path filePath = new Path(tmp.getRoot().getPath(), 
UUID.randomUUID().toString());
-               FSDataOutputStream fdos = 
FileSystem.get(filePath.toUri()).create(filePath, false);
-               DataOutputStream dos = new DataOutputStream(fdos);
-               for (int i = 0; i < 10; i++) {
-                       dos.writeLong(ThreadLocalRandom.current().nextLong());
-               }
-
-               try {
-                       store.loadSavepoint(filePath.toString());
-                       fail("Did not throw expected Exception");
-               } catch (RuntimeException e) {
-                       assertTrue(e.getMessage().contains("Flink 1.0") && 
e.getMessage().contains("Unexpected magic number"));
-               }
-       }
-
-       /**
-        * Tests addition of a new savepoint version.
-        */
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testMultipleSavepointVersions() throws Exception {
-               Field field = 
SavepointSerializers.class.getDeclaredField("SERIALIZERS");
-               field.setAccessible(true);
-               Map<Integer, SavepointSerializer<?>> serializers = 
(Map<Integer, SavepointSerializer<?>>) field.get(null);
-
-               assertTrue(serializers.size() >= 1);
-
-               FsSavepointStore store = new 
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
-               assertEquals(0, tmp.getRoot().listFiles().length);
-
-               // New savepoint type for test
-               int version = ThreadLocalRandom.current().nextInt();
-               long checkpointId = ThreadLocalRandom.current().nextLong();
-
-               // Add serializer
-               serializers.put(version, NewSavepointSerializer.INSTANCE);
-
-               TestSavepoint newSavepoint = new TestSavepoint(version, 
checkpointId);
-               String pathNewSavepoint = store.storeSavepoint(newSavepoint);
-               assertEquals(1, tmp.getRoot().listFiles().length);
-
-               // Savepoint v0
-               Savepoint savepoint = new SavepointV1(checkpointId, 
SavepointV1Test.createTaskStates(4, 32));
-               String pathSavepoint = store.storeSavepoint(savepoint);
-               assertEquals(2, tmp.getRoot().listFiles().length);
-
-               // Load
-               Savepoint loaded = store.loadSavepoint(pathNewSavepoint);
-               assertEquals(newSavepoint, loaded);
-
-               loaded = store.loadSavepoint(pathSavepoint);
-               assertEquals(savepoint, loaded);
-       }
-
-       /**
-        * Tests that an exception during store cleans up the created savepoint 
file.
-        */
-       @Test
-       public void testCleanupOnStoreFailure() throws Exception {
-               Field field = 
SavepointSerializers.class.getDeclaredField("SERIALIZERS");
-               field.setAccessible(true);
-               Map<Integer, SavepointSerializer<?>> serializers = 
(Map<Integer, SavepointSerializer<?>>) field.get(null);
-
-               final int version = 123123;
-               SavepointSerializer<TestSavepoint> serializer = 
mock(SavepointSerializer.class);
-               doThrow(new RuntimeException("Test Exception")).when(serializer)
-                               .serialize(Matchers.any(TestSavepoint.class), 
any(DataOutputStream.class));
-
-               FsSavepointStore store = new 
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
-               serializers.put(version, serializer);
-
-               Savepoint savepoint = new TestSavepoint(version, 12123123);
-
-               assertEquals(0, tmp.getRoot().listFiles().length);
-
-               try {
-                       store.storeSavepoint(savepoint);
-               } catch (Throwable ignored) {
-               }
-
-               assertEquals("Savepoint file not cleaned up on failure", 0, 
tmp.getRoot().listFiles().length);
-       }
-
-       private static class NewSavepointSerializer implements 
SavepointSerializer<TestSavepoint> {
-
-               private static final NewSavepointSerializer INSTANCE = new 
NewSavepointSerializer();
-
-               @Override
-               public void serialize(TestSavepoint savepoint, DataOutputStream 
dos) throws IOException {
-                       dos.writeInt(savepoint.version);
-                       dos.writeLong(savepoint.checkpointId);
-               }
-
-               @Override
-               public TestSavepoint deserialize(DataInputStream dis) throws 
IOException {
-                       int version = dis.readInt();
-                       long checkpointId = dis.readLong();
-                       return new TestSavepoint(version, checkpointId);
-               }
-
-       }
-
-       private static class TestSavepoint implements Savepoint {
-
-               private final int version;
-               private final long checkpointId;
-
-               public TestSavepoint(int version, long checkpointId) {
-                       this.version = version;
-                       this.checkpointId = checkpointId;
-               }
-
-               @Override
-               public int getVersion() {
-                       return version;
-               }
-
-               @Override
-               public long getCheckpointId() {
-                       return checkpointId;
-               }
-
-               @Override
-               public Collection<TaskState> getTaskStates() {
-                       return Collections.EMPTY_LIST;
-               }
-
-               @Override
-               public void dispose() {
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-                       TestSavepoint that = (TestSavepoint) o;
-                       return version == that.version && checkpointId == 
that.checkpointId;
-
-               }
-
-               @Override
-               public int hashCode() {
-                       int result = version;
-                       result = 31 * result + (int) (checkpointId ^ 
(checkpointId >>> 32));
-                       return result;
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 766531a..b594f4e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -23,8 +23,11 @@ import 
org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -32,66 +35,59 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class SavepointLoaderTest {
 
+       @Rule
+       public TemporaryFolder tmpFolder = new TemporaryFolder();
+
        /**
         * Tests loading and validation of savepoints with correct setup,
         * parallelism mismatch, and a missing task.
         */
        @Test
        public void testLoadAndValidateSavepoint() throws Exception {
+               File tmp = tmpFolder.newFolder();
+
                int parallelism = 128128;
+               long checkpointId = Integer.MAX_VALUE + 123123L;
                JobVertexID vertexId = new JobVertexID();
 
                TaskState state = mock(TaskState.class);
                when(state.getParallelism()).thenReturn(parallelism);
                when(state.getJobVertexID()).thenReturn(vertexId);
+               when(state.getMaxParallelism()).thenReturn(parallelism);
+               when(state.getChainLength()).thenReturn(1);
 
                Map<JobVertexID, TaskState> taskStates = new HashMap<>();
                taskStates.put(vertexId, state);
 
-               CompletedCheckpoint stored = new CompletedCheckpoint(
-                               new JobID(),
-                               Integer.MAX_VALUE + 123123L,
-                               10200202,
-                               1020292988,
-                               taskStates,
-                               true);
-
                // Store savepoint
-               SavepointV1 savepoint = new 
SavepointV1(stored.getCheckpointID(), taskStates.values());
-               SavepointStore store = new HeapSavepointStore();
-               String path = store.storeSavepoint(savepoint);
+               SavepointV1 savepoint = new SavepointV1(checkpointId, 
taskStates.values());
+               String path = 
SavepointStore.storeSavepoint(tmp.getAbsolutePath(), savepoint);
 
                JobID jobId = new JobID();
 
                ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
                when(vertex.getParallelism()).thenReturn(parallelism);
+               when(vertex.getMaxParallelism()).thenReturn(parallelism);
 
                Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
                tasks.put(vertexId, vertex);
 
                // 1) Load and validate: everything correct
-               CompletedCheckpoint loaded = 
SavepointLoader.loadAndValidateSavepoint(jobId, tasks, store, path);
+               CompletedCheckpoint loaded = 
SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path);
 
                assertEquals(jobId, loaded.getJobId());
-               assertEquals(stored.getCheckpointID(), 
loaded.getCheckpointID());
-
-               // The loaded checkpoint should not discard state when its 
discarded
-               loaded.discardState();
-               verify(state, times(0)).discardState();
+               assertEquals(checkpointId, loaded.getCheckpointID());
 
                // 2) Load and validate: max parallelism mismatch
                when(vertex.getMaxParallelism()).thenReturn(222);
 
                try {
-                       SavepointLoader.loadAndValidateSavepoint(jobId, tasks, 
store, path);
+                       SavepointLoader.loadAndValidateSavepoint(jobId, tasks, 
path);
                        fail("Did not throw expected Exception");
                } catch (IllegalStateException expected) {
                        assertTrue(expected.getMessage().contains("Max 
parallelism mismatch"));
@@ -101,7 +97,7 @@ public class SavepointLoaderTest {
                assertNotNull(tasks.remove(vertexId));
 
                try {
-                       SavepointLoader.loadAndValidateSavepoint(jobId, tasks, 
store, path);
+                       SavepointLoader.loadAndValidateSavepoint(jobId, tasks, 
path);
                        fail("Did not throw expected Exception");
                } catch (IllegalStateException expected) {
                        assertTrue(expected.getMessage().contains("Cannot map 
old state"));

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactoryTest.java
deleted file mode 100644
index 3dfe85e..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactoryTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.Path;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class SavepointStoreFactoryTest {
-
-       @Test
-       public void testStateStoreWithDefaultConfig() throws Exception {
-               SavepointStore store = 
SavepointStoreFactory.createFromConfig(new Configuration());
-               Assert.assertTrue(store instanceof HeapSavepointStore);
-       }
-
-       @Test
-       public void testSavepointBackendJobManager() throws Exception {
-               Configuration config = new Configuration();
-               config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, 
"jobmanager");
-               SavepointStore store = 
SavepointStoreFactory.createFromConfig(config);
-               Assert.assertTrue(store instanceof HeapSavepointStore);
-       }
-
-       @Test
-       public void testSavepointBackendFileSystem() throws Exception {
-               Configuration config = new Configuration();
-               String rootPath = System.getProperty("java.io.tmpdir");
-               config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-               config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, 
"filesystem");
-               config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, 
rootPath);
-
-               SavepointStore store = 
SavepointStoreFactory.createFromConfig(config);
-               Assert.assertTrue(store instanceof FsSavepointStore);
-
-               FsSavepointStore stateStore = (FsSavepointStore) store;
-               Assert.assertEquals(new Path(rootPath), 
stateStore.getRootPath());
-       }
-
-       @Test
-       public void 
testSavepointBackendFileSystemButCheckpointBackendJobManager() throws Exception 
{
-               Configuration config = new Configuration();
-               String rootPath = System.getProperty("java.io.tmpdir");
-               // This combination does not make sense, because the 
checkpoints will be
-               // lost after the job manager shuts down.
-               config.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
-               config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, 
"filesystem");
-               config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, 
rootPath);
-
-               SavepointStore store = 
SavepointStoreFactory.createFromConfig(config);
-               Assert.assertTrue(store instanceof FsSavepointStore);
-
-               FsSavepointStore stateStore = (FsSavepointStore) store;
-               Assert.assertEquals(new Path(rootPath), 
stateStore.getRootPath());
-       }
-
-       @Test(expected = IllegalConfigurationException.class)
-       public void testSavepointBackendFileSystemButNoDirectory() throws 
Exception {
-               Configuration config = new Configuration();
-               config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, 
"filesystem");
-               SavepointStoreFactory.createFromConfig(config);
-               Assert.fail("Did not throw expected Exception");
-       }
-
-       @Test(expected = IllegalConfigurationException.class)
-       public void testUnexpectedSavepointBackend() throws Exception {
-               Configuration config = new Configuration();
-               config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, 
"unexpected");
-               SavepointStoreFactory.createFromConfig(config);
-               Assert.fail("Did not throw expected Exception");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
new file mode 100644
index 0000000..8eed6ea
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Matchers;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+public class SavepointStoreTest {
+
+       @Rule
+       public TemporaryFolder tmp = new TemporaryFolder();
+
+       /**
+        * Tests a store-load-dispose sequence.
+        */
+       @Test
+       public void testStoreLoadDispose() throws Exception {
+               String target = tmp.getRoot().getAbsolutePath();
+
+               assertEquals(0, tmp.getRoot().listFiles().length);
+
+               // Store
+               SavepointV1 stored = new SavepointV1(1929292, 
SavepointV1Test.createTaskStates(4, 24));
+               String path = SavepointStore.storeSavepoint(target, stored);
+               assertEquals(1, tmp.getRoot().listFiles().length);
+
+               // Load
+               Savepoint loaded = SavepointStore.loadSavepoint(path);
+               assertEquals(stored, loaded);
+
+               loaded.dispose();
+
+               // Dispose
+               SavepointStore.removeSavepoint(path);
+
+               assertEquals(0, tmp.getRoot().listFiles().length);
+       }
+
+       /**
+        * Tests loading with unexpected magic number.
+        */
+       @Test
+       public void testUnexpectedSavepoint() throws Exception {
+               // Random file
+               Path filePath = new Path(tmp.getRoot().getPath(), 
UUID.randomUUID().toString());
+               FSDataOutputStream fdos = 
FileSystem.get(filePath.toUri()).create(filePath, false);
+               DataOutputStream dos = new DataOutputStream(fdos);
+               for (int i = 0; i < 10; i++) {
+                       dos.writeLong(ThreadLocalRandom.current().nextLong());
+               }
+
+               try {
+                       SavepointStore.loadSavepoint(filePath.toString());
+                       fail("Did not throw expected Exception");
+               } catch (RuntimeException e) {
+                       assertTrue(e.getMessage().contains("Flink 1.0") && 
e.getMessage().contains("Unexpected magic number"));
+               }
+       }
+
+       /**
+        * Tests addition of a new savepoint version.
+        */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testMultipleSavepointVersions() throws Exception {
+               Field field = 
SavepointSerializers.class.getDeclaredField("SERIALIZERS");
+               field.setAccessible(true);
+               Map<Integer, SavepointSerializer<?>> serializers = 
(Map<Integer, SavepointSerializer<?>>) field.get(null);
+
+               assertTrue(serializers.size() >= 1);
+
+               String target = tmp.getRoot().getAbsolutePath();
+               assertEquals(0, tmp.getRoot().listFiles().length);
+
+               // New savepoint type for test
+               int version = ThreadLocalRandom.current().nextInt();
+               long checkpointId = ThreadLocalRandom.current().nextLong();
+
+               // Add serializer
+               serializers.put(version, NewSavepointSerializer.INSTANCE);
+
+               TestSavepoint newSavepoint = new TestSavepoint(version, 
checkpointId);
+               String pathNewSavepoint = SavepointStore.storeSavepoint(target, 
newSavepoint);
+               assertEquals(1, tmp.getRoot().listFiles().length);
+
+               // Savepoint v0
+               Savepoint savepoint = new SavepointV1(checkpointId, 
SavepointV1Test.createTaskStates(4, 32));
+               String pathSavepoint = SavepointStore.storeSavepoint(target, 
savepoint);
+               assertEquals(2, tmp.getRoot().listFiles().length);
+
+               // Load
+               Savepoint loaded = 
SavepointStore.loadSavepoint(pathNewSavepoint);
+               assertEquals(newSavepoint, loaded);
+
+               loaded = SavepointStore.loadSavepoint(pathSavepoint);
+               assertEquals(savepoint, loaded);
+       }
+
+       /**
+        * Tests that an exception during store cleans up the created savepoint 
file.
+        */
+       @Test
+       public void testCleanupOnStoreFailure() throws Exception {
+               Field field = 
SavepointSerializers.class.getDeclaredField("SERIALIZERS");
+               field.setAccessible(true);
+               Map<Integer, SavepointSerializer<?>> serializers = 
(Map<Integer, SavepointSerializer<?>>) field.get(null);
+
+               String target = tmp.getRoot().getAbsolutePath();
+
+               final int version = 123123;
+               SavepointSerializer<TestSavepoint> serializer = 
mock(SavepointSerializer.class);
+               doThrow(new RuntimeException("Test Exception")).when(serializer)
+                               .serialize(Matchers.any(TestSavepoint.class), 
any(DataOutputStream.class));
+
+               serializers.put(version, serializer);
+
+               Savepoint savepoint = new TestSavepoint(version, 12123123);
+
+               assertEquals(0, tmp.getRoot().listFiles().length);
+
+               try {
+                       SavepointStore.storeSavepoint(target, savepoint);
+               } catch (Throwable ignored) {
+               }
+
+               assertEquals("Savepoint file not cleaned up on failure", 0, 
tmp.getRoot().listFiles().length);
+       }
+
+       private static class NewSavepointSerializer implements 
SavepointSerializer<TestSavepoint> {
+
+               private static final NewSavepointSerializer INSTANCE = new 
NewSavepointSerializer();
+
+               @Override
+               public void serialize(TestSavepoint savepoint, DataOutputStream 
dos) throws IOException {
+                       dos.writeInt(savepoint.version);
+                       dos.writeLong(savepoint.checkpointId);
+               }
+
+               @Override
+               public TestSavepoint deserialize(DataInputStream dis) throws 
IOException {
+                       int version = dis.readInt();
+                       long checkpointId = dis.readLong();
+                       return new TestSavepoint(version, checkpointId);
+               }
+
+       }
+
+       private static class TestSavepoint implements Savepoint {
+
+               private final int version;
+               private final long checkpointId;
+
+               public TestSavepoint(int version, long checkpointId) {
+                       this.version = version;
+                       this.checkpointId = checkpointId;
+               }
+
+               @Override
+               public int getVersion() {
+                       return version;
+               }
+
+               @Override
+               public long getCheckpointId() {
+                       return checkpointId;
+               }
+
+               @Override
+               public Collection<TaskState> getTaskStates() {
+                       return Collections.EMPTY_LIST;
+               }
+
+               @Override
+               public void dispose() {
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       TestSavepoint that = (TestSavepoint) o;
+                       return version == that.version && checkpointId == 
that.checkpointId;
+
+               }
+
+               @Override
+               public int hashCode() {
+                       int result = version;
+                       result = 31 * result + (int) (checkpointId ^ 
(checkpointId >>> 32));
+                       return result;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index 1e95732..2dac87f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint.stats;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
@@ -344,7 +345,7 @@ public class SimpleCheckpointStatsTrackerTest {
                        // Add some random delay
                        final long completionTimestamp = triggerTimestamp + 
completionDuration + RAND.nextInt(10);
 
-                       checkpoints[i] = new CompletedCheckpoint(jobId, i, 
triggerTimestamp, completionTimestamp, taskGroupStates, true);
+                       checkpoints[i] = new CompletedCheckpoint(jobId, i, 
triggerTimestamp, completionTimestamp, taskGroupStates, 
CheckpointProperties.forStandardCheckpoint(), null);
                }
 
                return checkpoints;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 612fe35..9277029 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -37,7 +37,6 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
@@ -49,6 +48,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -168,7 +168,6 @@ public class JobManagerHARecoveryTest {
                                        myLeaderElectionService,
                                        mySubmittedJobGraphStore,
                                        checkpointStateFactory,
-                                       new HeapSavepointStore(),
                                        jobRecoveryTimeout,
                                        Option.apply(null));
 
@@ -205,7 +204,8 @@ public class JobManagerHARecoveryTest {
                                        100,
                                        10 * 60 * 1000,
                                        0,
-                                       1));
+                                       1,
+                                       ExternalizedCheckpointSettings.none()));
 
                        
BlockingStatefulInvokable.initializeStaticHelpers(slots);
 
@@ -294,7 +294,6 @@ public class JobManagerHARecoveryTest {
         */
        static class MyCheckpointStore implements CompletedCheckpointStore {
 
-
                private final ArrayDeque<CompletedCheckpoint> checkpoints = new 
ArrayDeque<>(2);
 
                private final ArrayDeque<CompletedCheckpoint> suspended = new 
ArrayDeque<>(2);
@@ -309,7 +308,7 @@ public class JobManagerHARecoveryTest {
                public void addCheckpoint(CompletedCheckpoint checkpoint) 
throws Exception {
                        checkpoints.addLast(checkpoint);
                        if (checkpoints.size() > 1) {
-                               checkpoints.removeFirst().discardState();
+                               checkpoints.removeFirst().subsume();
                        }
                }
 
@@ -319,15 +318,14 @@ public class JobManagerHARecoveryTest {
                }
 
                @Override
-               public void shutdown() throws Exception {
-                       checkpoints.clear();
-                       suspended.clear();
-               }
-
-               @Override
-               public void suspend() throws Exception {
-                       suspended.addAll(checkpoints);
-                       checkpoints.clear();
+               public void shutdown(JobStatus jobStatus) throws Exception {
+                       if (jobStatus.isGloballyTerminalState()) {
+                               checkpoints.clear();
+                               suspended.clear();
+                       } else {
+                               suspended.addAll(checkpoints);
+                               checkpoints.clear();
+                       }
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index ed4d530..0c45fac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -39,8 +39,6 @@ import 
org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -183,7 +181,6 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                // We don't need recovery in this test
                SubmittedJobGraphStore submittedJobGraphStore = new 
StandaloneSubmittedJobGraphStore();
                CheckpointRecoveryFactory checkpointRecoveryFactory = new 
StandaloneCheckpointRecoveryFactory();
-               SavepointStore savepointStore = 
SavepointStoreFactory.createFromConfig(configuration);
 
                return Props.create(
                                TestingJobManager.class,
@@ -198,7 +195,6 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                                leaderElectionService,
                                submittedJobGraphStore,
                                checkpointRecoveryFactory,
-                               savepointStore,
                                AkkaUtils.getDefaultTimeout(),
                                Option.apply(null)
                );

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 548bef0..0569297 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.jobmanager
 
-
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
 import akka.util.Timeout
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.akka.ListeningBehaviour
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator
+import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, 
CompletedCheckpoint}
 import org.apache.flink.runtime.client.JobExecutionException
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture
+import 
org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, 
JobSnapshottingSettings}
 import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, 
JobVertex, ScheduleMode}
 import org.apache.flink.runtime.jobmanager.Tasks._
 import 
org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, 
SlotSharingGroup}
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
 import org.junit.runner.RunWith
+import org.mockito.Mockito
 import org.mockito.Mockito._
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -752,7 +753,7 @@ class JobManagerITCase(_system: ActorSystem)
           val jobId = new JobID()
 
           // Trigger savepoint for non-existing job
-          jobManager.tell(TriggerSavepoint(jobId), testActor)
+          jobManager.tell(TriggerSavepoint(jobId, Option.apply("any")), 
testActor)
           val response = 
expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
 
           // Verify the response
@@ -784,7 +785,7 @@ class JobManagerITCase(_system: ActorSystem)
           expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
 
           // Trigger savepoint for job with disabled checkpointing
-          jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
+          jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), 
Option.apply("any")), testActor)
           val response = 
expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
 
           // Verify the response
@@ -815,7 +816,7 @@ class JobManagerITCase(_system: ActorSystem)
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
-            60000, 60000, 60000, 1))
+            60000, 60000, 60000, 1, ExternalizedCheckpointSettings.none))
 
           // Submit job...
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), 
testActor)
@@ -829,7 +830,8 @@ class JobManagerITCase(_system: ActorSystem)
           // Mock the checkpoint coordinator
           val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
           doThrow(new Exception("Expected Test Exception"))
-            
.when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+            .when(checkpointCoordinator)
+            .triggerSavepoint(org.mockito.Matchers.anyLong(), 
org.mockito.Matchers.anyString())
 
           // Update the savepoint coordinator field
           val field = 
executionGraph.getClass.getDeclaredField("checkpointCoordinator")
@@ -837,7 +839,7 @@ class JobManagerITCase(_system: ActorSystem)
           field.set(executionGraph, checkpointCoordinator)
 
           // Trigger savepoint for job
-          jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
+          jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), 
Option.apply("any")), testActor)
           val response = 
expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
 
           // Verify the response
@@ -851,7 +853,7 @@ class JobManagerITCase(_system: ActorSystem)
       }
     }
 
-    "handle trigger savepoint response after failed savepoint future" in {
+    "handle failed savepoint triggering" in {
       val deadline = TestingUtils.TESTING_DURATION.fromNow
 
       val flinkCluster = TestingUtils.startTestingCluster(1, 1)
@@ -868,7 +870,7 @@ class JobManagerITCase(_system: ActorSystem)
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
-            60000, 60000, 60000, 1))
+            60000, 60000, 60000, 1, ExternalizedCheckpointSettings.none))
 
           // Submit job...
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), 
testActor)
@@ -877,10 +879,12 @@ class JobManagerITCase(_system: ActorSystem)
           // Mock the checkpoint coordinator
           val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
           doThrow(new Exception("Expected Test Exception"))
-            
.when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
-          val savepointPathPromise = scala.concurrent.promise[String]
-          doReturn(savepointPathPromise.future)
-            
.when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+            .when(checkpointCoordinator)
+            .triggerSavepoint(org.mockito.Matchers.anyLong(), 
org.mockito.Matchers.anyString())
+          val savepointPathPromise = new 
FlinkCompletableFuture[CompletedCheckpoint]()
+          doReturn(savepointPathPromise)
+            .when(checkpointCoordinator)
+            .triggerSavepoint(org.mockito.Matchers.anyLong(), 
org.mockito.Matchers.anyString())
 
           // Request the execution graph and set a checkpoint coordinator mock
           jobManager.tell(RequestExecutionGraph(jobGraph.getJobID), testActor)
@@ -893,10 +897,10 @@ class JobManagerITCase(_system: ActorSystem)
           field.set(executionGraph, checkpointCoordinator)
 
           // Trigger savepoint for job
-          jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
+          jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), 
Option.apply("any")), testActor)
 
           // Fail the promise
-          savepointPathPromise.failure(new Exception("Expected Test 
Exception"))
+          savepointPathPromise.completeExceptionally(new Exception("Expected 
Test Exception"))
 
           val response = 
expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
 
@@ -928,7 +932,7 @@ class JobManagerITCase(_system: ActorSystem)
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
-            60000, 60000, 60000, 1))
+            60000, 60000, 60000, 1, ExternalizedCheckpointSettings.none))
 
           // Submit job...
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), 
testActor)
@@ -937,10 +941,13 @@ class JobManagerITCase(_system: ActorSystem)
           // Mock the checkpoint coordinator
           val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
           doThrow(new Exception("Expected Test Exception"))
-            
.when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
-          val savepointPathPromise = scala.concurrent.promise[String]
-          doReturn(savepointPathPromise.future)
-            
.when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+            .when(checkpointCoordinator)
+            .triggerSavepoint(org.mockito.Matchers.anyLong(), 
org.mockito.Matchers.anyString())
+
+          val savepointPromise = new 
FlinkCompletableFuture[CompletedCheckpoint]()
+          doReturn(savepointPromise)
+            .when(checkpointCoordinator)
+            .triggerSavepoint(org.mockito.Matchers.anyLong(), 
org.mockito.Matchers.anyString())
 
           // Request the execution graph and set a checkpoint coordinator mock
           jobManager.tell(RequestExecutionGraph(jobGraph.getJobID), testActor)
@@ -953,10 +960,13 @@ class JobManagerITCase(_system: ActorSystem)
           field.set(executionGraph, checkpointCoordinator)
 
           // Trigger savepoint for job
-          jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
+          jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), 
Option.apply("any")), testActor)
+
+          val checkpoint = Mockito.mock(classOf[CompletedCheckpoint])
+          when(checkpoint.getExternalPath).thenReturn("Expected test savepoint 
path")
 
           // Succeed the promise
-          savepointPathPromise.success("Expected test savepoint path")
+          savepointPromise.complete(checkpoint)
 
           val response = 
expectMsgType[TriggerSavepointSuccess](deadline.timeLeft)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index c01a321..50a5559 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -20,14 +20,13 @@ package org.apache.flink.runtime.testingUtils
 
 import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
 
-import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem, Props}
 import akka.pattern.Patterns._
+import akka.pattern.ask
 import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -38,10 +37,10 @@ import org.apache.flink.runtime.jobmanager.{JobManager, 
MemoryArchivist, Submitt
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
-import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
 import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.testutils.TestingResourceManager
 
 import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Await, Future}
@@ -91,7 +90,6 @@ class TestingCluster(
     leaderElectionService: LeaderElectionService,
     submittedJobGraphStore: SubmittedJobGraphStore,
     checkpointRecoveryFactory: CheckpointRecoveryFactory,
-    savepointStore: SavepointStore,
     jobRecoveryTimeout: FiniteDuration,
     metricsRegistry: Option[MetricRegistry]): Props = {
 
@@ -108,7 +106,6 @@ class TestingCluster(
       leaderElectionService,
       submittedJobGraphStore,
       checkpointRecoveryFactory,
-      savepointStore,
       jobRecoveryTimeout,
       metricsRegistry)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 62349db..e9bdb99 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.ExecutorService
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.instance.InstanceManager
@@ -50,12 +49,11 @@ class TestingJobManager(
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    savepointStore : SavepointStore,
     jobRecoveryTimeout : FiniteDuration,
     metricRegistry : Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
-      executorService,
+    executorService,
     instanceManager,
     scheduler,
     libraryCacheManager,
@@ -65,7 +63,6 @@ class TestingJobManager(
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory,
-    savepointStore,
     jobRecoveryTimeout,
     metricRegistry)
   with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 5ba2790..d775869 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -22,6 +22,7 @@ import akka.actor.{ActorRef, Cancellable, Terminated}
 import akka.pattern.{ask, pipe}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
@@ -294,7 +295,7 @@ trait TestingJobManagerLike extends FlinkActor {
 
     case RequestSavepoint(savepointPath) =>
       try {
-        val savepoint = savepointStore.loadSavepoint(savepointPath)
+        val savepoint = SavepointStore.loadSavepoint(savepointPath)
         sender ! ResponseSavepoint(savepoint)
       }
       catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index a268c83..0abdd46 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -20,28 +20,26 @@ package org.apache.flink.runtime.testingUtils
 
 import java.util.UUID
 
-import akka.actor.{Props, Kill, ActorSystem, ActorRef}
+import akka.actor.{ActorRef, ActorSystem, Kill, Props}
 import akka.pattern.ask
 import com.google.common.util.concurrent.MoreExecutors
-
 import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobExecutionResult
-
-import org.apache.flink.configuration.{HighAvailabilityOptions, 
ConfigConstants, Configuration}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, 
HighAvailabilityOptions}
+import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
-import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{HighAvailabilityMode, 
MemoryArchivist, JobManager}
-import org.apache.flink.runtime.testutils.TestingResourceManager
-import org.apache.flink.runtime.util.LeaderRetrievalUtils
-import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, 
FlinkActor}
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
+import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
+import org.apache.flink.runtime.jobgraph.JobGraph
+import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testutils.TestingResourceManager
+import org.apache.flink.runtime.util.LeaderRetrievalUtils
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
 
 import scala.concurrent.duration._
 import scala.concurrent.{Await, ExecutionContext}
@@ -356,7 +354,6 @@ object TestingUtils {
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory,
-    savepointStore,
     jobRecoveryTimeout,
     metricsRegistry) = JobManager.createJobManagerComponents(
       configuration,

Reply via email to