http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
deleted file mode 100644
index 6b4f354..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
+++ /dev/null
@@ -1,1119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
-import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the savepoint coordinator.
- */
-public class SavepointCoordinatorTest extends TestLogger {
-
-       // 
------------------------------------------------------------------------
-       // Trigger and acknowledge
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Simple trigger-acknowledge test for a single savepoint.
-        */
-       @Test
-       public void testSimpleTriggerSavepoint() throws Exception {
-               JobID jobId = new JobID();
-               long checkpointTimeout = 60 * 1000;
-               long timestamp = 1272635;
-               ExecutionVertex[] vertices = new ExecutionVertex[] {
-                               mockExecutionVertex(jobId),
-                               mockExecutionVertex(jobId) };
-               MockCheckpointIdCounter checkpointIdCounter = new 
MockCheckpointIdCounter();
-               HeapSavepointStore savepointStore = new HeapSavepointStore();
-
-               SavepointCoordinator coordinator = createSavepointCoordinator(
-                               jobId,
-                               checkpointTimeout,
-                               vertices,
-                               vertices,
-                               vertices,
-                               checkpointIdCounter,
-                               savepointStore);
-
-               // Trigger the savepoint
-               Future<String> savepointPathFuture = 
coordinator.triggerSavepoint(timestamp);
-               assertFalse(savepointPathFuture.isCompleted());
-
-               long checkpointId = checkpointIdCounter.getLastReturnedCount();
-               assertEquals(0, checkpointId);
-
-               // Verify send trigger messages
-               for (ExecutionVertex vertex : vertices) {
-                       verifyTriggerCheckpoint(vertex, checkpointId, 
timestamp);
-               }
-
-               PendingCheckpoint pendingCheckpoint = 
coordinator.getPendingCheckpoints()
-                               .get(checkpointId);
-
-               verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId,
-                               timestamp, 0, 2, 0, false, false);
-
-               // Acknowledge tasks
-               for (ExecutionVertex vertex : vertices) {
-                       coordinator.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(
-                                       jobId, 
vertex.getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointId, 
createSerializedStateHandle(vertex), 0));
-               }
-
-               // The pending checkpoint is completed
-               assertTrue(pendingCheckpoint.isDiscarded());
-               assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
-
-               // Verify send notify complete messages
-               for (ExecutionVertex vertex : vertices) {
-                       verifyNotifyCheckpointComplete(vertex, checkpointId, 
timestamp);
-               }
-
-               // Verify that the future has been completed
-               assertTrue(savepointPathFuture.isCompleted());
-               String savepointPath = Await.result(savepointPathFuture, 
FiniteDuration.Zero());
-
-               // Verify the savepoint
-               Savepoint savepoint = 
savepointStore.loadSavepoint(savepointPath);
-               verifySavepoint(savepoint, checkpointId, vertices);
-
-               // Verify all promises removed
-               assertEquals(0, getSavepointPromises(coordinator).size());
-
-               coordinator.shutdown();
-       }
-
-       /**
-        * This test triggers a checkpoint and then sends a decline checkpoint 
message from
-        * one of the tasks. The expected behaviour is that said checkpoint is 
discarded and a new
-        * checkpoint is triggered.
-        */
-       @Test
-       public void testTriggerAndDeclineCheckpointSimple() throws Exception {
-               JobID jobId = new JobID();
-               long checkpointTimeout = 60 * 1000;
-               long timestamp = 1272635;
-               ExecutionVertex[] vertices = new ExecutionVertex[] {
-                               mockExecutionVertex(jobId),
-                               mockExecutionVertex(jobId) };
-               MockCheckpointIdCounter checkpointIdCounter = new 
MockCheckpointIdCounter();
-               SavepointStore savepointStore = new HeapSavepointStore();
-
-               SavepointCoordinator coordinator = createSavepointCoordinator(
-                               jobId,
-                               checkpointTimeout,
-                               vertices,
-                               vertices,
-                               vertices,
-                               checkpointIdCounter,
-                               savepointStore);
-
-               // Trigger the savepoint
-               Future<String> savepointPathFuture = 
coordinator.triggerSavepoint(timestamp);
-               assertFalse(savepointPathFuture.isCompleted());
-
-               long checkpointId = checkpointIdCounter.getLastReturnedCount();
-               assertEquals(0, checkpointId);
-
-               // Verify send trigger messages
-               for (ExecutionVertex vertex : vertices) {
-                       verifyTriggerCheckpoint(vertex, checkpointId, 
timestamp);
-               }
-
-               PendingCheckpoint pendingCheckpoint = 
coordinator.getPendingCheckpoints()
-                               .get(checkpointId);
-
-               verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId,
-                               timestamp, 0, 2, 0, false, false);
-
-               // Acknowledge and decline tasks
-               coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-                               jobId, 
vertices[0].getCurrentExecutionAttempt().getAttemptId(),
-                               checkpointId, 
createSerializedStateHandle(vertices[0]), 0));
-
-               coordinator.receiveDeclineMessage(new DeclineCheckpoint(
-                               jobId, 
vertices[1].getCurrentExecutionAttempt().getAttemptId(),
-                               checkpointId, 0));
-
-
-               // The pending checkpoint is completed
-               assertTrue(pendingCheckpoint.isDiscarded());
-               assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
-
-               // Verify that the future has been completed
-               assertTrue(savepointPathFuture.isCompleted());
-
-               try {
-                       Await.result(savepointPathFuture.failed(), 
FiniteDuration.Zero());
-                       fail("Did not throw expected exception");
-               } catch (Throwable ignored) {}
-
-               // Verify all promises removed
-               assertEquals(0, getSavepointPromises(coordinator).size());
-
-               coordinator.shutdown();
-       }
-
-       // 
------------------------------------------------------------------------
-       // Rollback
-       // 
------------------------------------------------------------------------
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testSimpleRollbackSavepoint() throws Exception {
-               JobID jobId = new JobID();
-
-               ExecutionJobVertex[] jobVertices = new ExecutionJobVertex[] {
-                               mockExecutionJobVertex(jobId, new 
JobVertexID(), 4),
-                               mockExecutionJobVertex(jobId, new 
JobVertexID(), 4) };
-
-               ExecutionVertex[] triggerVertices = 
jobVertices[0].getTaskVertices();
-               ExecutionVertex[] ackVertices = new ExecutionVertex[8];
-
-               int i = 0;
-               for (ExecutionJobVertex jobVertex : jobVertices) {
-                       for (ExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
-                               ackVertices[i++] = vertex;
-                       }
-               }
-
-               MockCheckpointIdCounter idCounter = new 
MockCheckpointIdCounter();
-               HeapSavepointStore savepointStore = new HeapSavepointStore();
-
-               SavepointCoordinator coordinator = createSavepointCoordinator(
-                               jobId,
-                               60 * 1000,
-                               triggerVertices,
-                               ackVertices,
-                               new ExecutionVertex[] {},
-                               idCounter,
-                               savepointStore);
-
-               Future<String> savepointPathFuture = 
coordinator.triggerSavepoint(1231273123);
-
-               // Acknowledge all tasks
-               for (ExecutionVertex vertex : ackVertices) {
-                       ExecutionAttemptID attemptId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
-                       coordinator.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(
-                                       jobId, attemptId, 0, 
createSerializedStateHandle(vertex), 0));
-               }
-
-               String savepointPath = Await.result(savepointPathFuture, 
FiniteDuration.Zero());
-               assertNotNull(savepointPath);
-
-               // Rollback
-               
coordinator.restoreSavepoint(createExecutionJobVertexMap(jobVertices), 
savepointPath);
-
-               // Verify all executions have been reset
-               for (ExecutionVertex vertex : ackVertices) {
-                       verify(vertex.getCurrentExecutionAttempt(), 
times(1)).setInitialState(
-                                       any(SerializedValue.class), 
any(Map.class));
-               }
-
-               // Verify all promises removed
-               assertEquals(0, getSavepointPromises(coordinator).size());
-
-               // Verify checkpoint ID counter started
-               assertTrue(idCounter.isStarted());
-
-               coordinator.shutdown();
-       }
-
-       @Test
-       public void testRollbackParallelismMismatch() throws Exception {
-               JobID jobId = new JobID();
-
-               ExecutionJobVertex[] jobVertices = new ExecutionJobVertex[] {
-                               mockExecutionJobVertex(jobId, new 
JobVertexID(), 4),
-                               mockExecutionJobVertex(jobId, new 
JobVertexID(), 4) };
-
-               ExecutionVertex[] triggerVertices = 
jobVertices[0].getTaskVertices();
-               ExecutionVertex[] ackVertices = new ExecutionVertex[8];
-
-               int index = 0;
-               for (ExecutionJobVertex jobVertex : jobVertices) {
-                       for (ExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
-                               ackVertices[index++] = vertex;
-                       }
-               }
-
-               HeapSavepointStore savepointStore = new HeapSavepointStore();
-
-               SavepointCoordinator coordinator = createSavepointCoordinator(
-                               jobId,
-                               60 * 1000,
-                               triggerVertices,
-                               ackVertices,
-                               new ExecutionVertex[] {},
-                               new MockCheckpointIdCounter(),
-                               savepointStore);
-
-               Future<String> savepointPathFuture = 
coordinator.triggerSavepoint(1231273123);
-
-               // Acknowledge all tasks
-               for (ExecutionVertex vertex : ackVertices) {
-                       ExecutionAttemptID attemptId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
-                       coordinator.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(
-                                       jobId, attemptId, 0, 
createSerializedStateHandle(vertex), 0));
-               }
-
-               String savepointPath = Await.result(savepointPathFuture, 
FiniteDuration.Zero());
-               assertNotNull(savepointPath);
-
-               // Change parallelism lower than original (state without 
matching subtask). The
-               // other way around (subtask without matching state) is OK.
-               for (int i = 0; i < jobVertices.length; i++) {
-                       jobVertices[i] = mockExecutionJobVertex(jobId, 
jobVertices[i].getJobVertexId(), 2);
-               }
-
-               try {
-                       // Rollback
-                       coordinator.restoreSavepoint(
-                                       
createExecutionJobVertexMap(jobVertices),
-                                       savepointPath);
-                       fail("Did not throw expected Exception after rollback 
with parallelism mismatch.");
-               }
-               catch (Exception ignored) {
-               }
-
-               // Verify all promises removed
-               assertEquals(0, getSavepointPromises(coordinator).size());
-
-               coordinator.shutdown();
-       }
-
-       @Test
-       public void testRollbackStateStoreFailure() throws Exception {
-               JobID jobId = new JobID();
-               ExecutionJobVertex jobVertex = mockExecutionJobVertex(jobId, 
new JobVertexID(), 4);
-               HeapSavepointStore savepointStore = spy(new 
HeapSavepointStore());
-
-               SavepointCoordinator coordinator = createSavepointCoordinator(
-                               jobId,
-                               60 * 1000,
-                               jobVertex.getTaskVertices(),
-                               jobVertex.getTaskVertices(),
-                               new ExecutionVertex[] {},
-                               new MockCheckpointIdCounter(),
-                               savepointStore);
-
-               Future<String> savepointPathFuture = 
coordinator.triggerSavepoint(1231273123);
-
-               // Acknowledge all tasks
-               for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
-                       ExecutionAttemptID attemptId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
-                       coordinator.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(
-                                       jobId, attemptId, 0, 
createSerializedStateHandle(vertex), 0));
-               }
-
-               String savepointPath = Await.result(savepointPathFuture, 
FiniteDuration.Zero());
-               assertNotNull(savepointPath);
-
-               // Failure on getState
-               doThrow(new 
RuntimeException("TestException")).when(savepointStore).loadSavepoint(anyString());
-
-               try {
-                       // Rollback
-                       coordinator.restoreSavepoint(
-                                       createExecutionJobVertexMap(jobVertex),
-                                       savepointPath);
-
-                       fail("Did not throw expected Exception after rollback 
with savepoint store failure.");
-               }
-               catch (Exception ignored) {
-               }
-
-               // Verify all promises removed
-               assertEquals(0, getSavepointPromises(coordinator).size());
-
-               coordinator.shutdown();
-       }
-
-       @Test
-       public void testRollbackSetsCheckpointID() throws Exception {
-               SavepointV0 savepoint = new SavepointV0(12312312L, 
Collections.<TaskState>emptyList());
-
-               CheckpointIDCounter checkpointIdCounter = 
mock(CheckpointIDCounter.class);
-
-               SavepointStore savepointStore = mock(SavepointStore.class);
-               
when(savepointStore.loadSavepoint(anyString())).thenReturn(savepoint);
-
-               SavepointCoordinator coordinator = createSavepointCoordinator(
-                               new JobID(),
-                               60 * 1000,
-                               new ExecutionVertex[] {},
-                               new ExecutionVertex[] {},
-                               new ExecutionVertex[] {},
-                               checkpointIdCounter,
-                               savepointStore);
-
-               coordinator.restoreSavepoint(createExecutionJobVertexMap(), 
"any");
-
-               verify(checkpointIdCounter).setCount(eq(12312312L + 1));
-
-               coordinator.shutdown();
-       }
-
-       // 
------------------------------------------------------------------------
-       // Savepoint aborts and future notifications
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testAbortSavepointIfTriggerTasksNotExecuted() throws 
Exception {
-               JobID jobId = new JobID();
-               ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
-                               mock(ExecutionVertex.class),
-                               mock(ExecutionVertex.class) };
-               ExecutionVertex[] ackVertices = new ExecutionVertex[] {
-                               mockExecutionVertex(jobId),
-                               mockExecutionVertex(jobId) };
-
-               SavepointCoordinator coordinator = createSavepointCoordinator(
-                               jobId,
-                               60 * 1000,
-                               triggerVertices,
-                               ackVertices,
-                               new ExecutionVertex[] {},
-                               new MockCheckpointIdCounter(),
-                               new HeapSavepointStore());
-
-               // Trigger savepoint
-               Future<String> savepointPathFuture = 
coordinator.triggerSavepoint(1238123);
-
-               // Abort the savepoint, because the vertices are not running
-               assertTrue(savepointPathFuture.isCompleted());
-
-               try {
-                       Await.result(savepointPathFuture, 
FiniteDuration.Zero());
-                       fail("Did not throw expected Exception after shutdown");
-               }
-               catch (Exception ignored) {
-               }
-
-               // Verify all promises removed
-               assertEquals(0, getSavepointPromises(coordinator).size());
-
-               coordinator.shutdown();
-       }
-
-       @Test
-       public void testAbortSavepointIfTriggerTasksAreFinished() throws 
Exception {
-               JobID jobId = new JobID();
-               ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
-                               mockExecutionVertex(jobId),
-                               mockExecutionVertex(jobId, 
ExecutionState.FINISHED) };
-               ExecutionVertex[] ackVertices = new ExecutionVertex[] {
-                               mockExecutionVertex(jobId),
-                               mockExecutionVertex(jobId) };
-
-               SavepointCoordinator coordinator = createSavepointCoordinator(
-                               jobId,
-                               60 * 1000,
-                               triggerVertices,
-                               ackVertices,
-                               new ExecutionVertex[] {},
-                               new MockCheckpointIdCounter(),
-                               new HeapSavepointStore());
-
-               // Trigger savepoint
-               Future<String> savepointPathFuture = 
coordinator.triggerSavepoint(1238123);
-
-               // Abort the savepoint, because the vertices are not running
-               assertTrue(savepointPathFuture.isCompleted());
-
-               try {
-                       Await.result(savepointPathFuture, 
FiniteDuration.Zero());
-                       fail("Did not throw expected Exception after shutdown");
-               }
-               catch (Exception ignored) {
-               }
-
-               // Verify all promises removed
-               assertEquals(0, getSavepointPromises(coordinator).size());
-
-               coordinator.shutdown();
-       }
-
-       @Test
-       public void testAbortSavepointIfAckTasksAreNotExecuted() throws 
Exception {
-               JobID jobId = new JobID();
-               ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
-                               mockExecutionVertex(jobId),
-                               mockExecutionVertex(jobId) };
-               ExecutionVertex[] ackVertices = new ExecutionVertex[] {
-                               mock(ExecutionVertex.class),
-                               mock(ExecutionVertex.class) };
-
-               SavepointCoordinator coordinator = createSavepointCoordinator(
-                               jobId,
-                               60 * 1000,
-                               triggerVertices,
-                               ackVertices,
-                               new ExecutionVertex[] {},
-                               new MockCheckpointIdCounter(),
-                               new HeapSavepointStore());
-
-               // Trigger savepoint
-               Future<String> savepointPathFuture = 
coordinator.triggerSavepoint(1238123);
-
-               // Abort the savepoint, because the vertices are not running
-               assertTrue(savepointPathFuture.isCompleted());
-
-               try {
-                       Await.result(savepointPathFuture, 
FiniteDuration.Zero());
-                       fail("Did not throw expected Exception after shutdown");
-               }
-               catch (Exception ignored) {
-               }
-
-               // Verify all promises removed
-               assertEquals(0, getSavepointPromises(coordinator).size());
-
-               coordinator.shutdown();
-       }
-
-       @Test
-       public void testAbortOnCheckpointTimeout() throws Exception {
-               JobID jobId = new JobID();
-               ExecutionVertex[] vertices = new ExecutionVertex[] {
-                               mockExecutionVertex(jobId),
-                               mockExecutionVertex(jobId) };
-               ExecutionVertex commitVertex = mockExecutionVertex(jobId);
-               MockCheckpointIdCounter checkpointIdCounter = new 
MockCheckpointIdCounter();
-
-               long checkpointTimeout = 1000;
-               SavepointCoordinator coordinator = createSavepointCoordinator(
-                               jobId,
-                               checkpointTimeout,
-                               vertices,
-                               vertices,
-                               new ExecutionVertex[] { commitVertex },
-                               checkpointIdCounter,
-                               new HeapSavepointStore());
-
-               // Trigger the savepoint
-               Future<String> savepointPathFuture = 
coordinator.triggerSavepoint(12731273);
-               assertFalse(savepointPathFuture.isCompleted());
-
-               long checkpointId = checkpointIdCounter.getLastReturnedCount();
-               PendingCheckpoint pendingCheckpoint = 
coordinator.getPendingCheckpoints()
-                               .get(checkpointId);
-
-               assertNotNull("Checkpoint not pending (test race)", 
pendingCheckpoint);
-               assertFalse("Checkpoint already discarded (test race)", 
pendingCheckpoint.isDiscarded());
-
-               // Wait for savepoint to timeout
-               Deadline deadline = FiniteDuration.apply(60, "s").fromNow();
-               while (deadline.hasTimeLeft()
-                               && !pendingCheckpoint.isDiscarded()
-                               && coordinator.getNumberOfPendingCheckpoints() 
> 0) {
-
-                       Thread.sleep(250);
-               }
-
-               // Verify discarded
-               assertTrue("Savepoint not discarded within timeout", 
pendingCheckpoint.isDiscarded());
-               assertEquals(0, coordinator.getNumberOfPendingCheckpoints());
-               assertEquals(0, 
coordinator.getNumberOfRetainedSuccessfulCheckpoints());
-
-               // No commit for timeout
-               verify(commitVertex, times(0)).sendMessageToCurrentExecution(
-                               any(NotifyCheckpointComplete.class), 
any(ExecutionAttemptID.class));
-
-               assertTrue(savepointPathFuture.isCompleted());
-
-               try {
-                       Await.result(savepointPathFuture, 
FiniteDuration.Zero());
-                       fail("Did not throw expected Exception after timeout");
-               }
-               catch (Exception ignored) {
-               }
-
-               // Verify all promises removed
-               assertEquals(0, getSavepointPromises(coordinator).size());
-
-               coordinator.shutdown();
-       }
-
-       @Test
-       public void testAbortSavepointsOnShutdown() throws Exception {
-               JobID jobId = new JobID();
-               ExecutionVertex[] vertices = new ExecutionVertex[] {
-                               mockExecutionVertex(jobId),
-                               mockExecutionVertex(jobId) };
-
-               SavepointCoordinator coordinator = createSavepointCoordinator(
-                               jobId,
-                               60 * 1000,
-                               vertices,
-                               vertices,
-                               vertices,
-                               new MockCheckpointIdCounter(),
-                               new HeapSavepointStore());
-
-               // Trigger savepoints
-               List<Future<String>> savepointPathFutures = new ArrayList<>();
-               
savepointPathFutures.add(coordinator.triggerSavepoint(12731273));
-               savepointPathFutures.add(coordinator.triggerSavepoint(12731273 
+ 123));
-
-               for (Future<String> future : savepointPathFutures) {
-                       assertFalse(future.isCompleted());
-               }
-
-               coordinator.shutdown();
-
-               // Verify futures failed
-               for (Future<String> future : savepointPathFutures) {
-                       assertTrue(future.isCompleted());
-
-                       try {
-                               Await.result(future, FiniteDuration.Zero());
-                               fail("Did not throw expected Exception after 
shutdown");
-                       }
-                       catch (Exception ignored) {
-                       }
-               }
-
-               // Verify all promises removed
-               assertEquals(0, getSavepointPromises(coordinator).size());
-       }
-
-       @Test
-       public void testAbortSavepointOnStateStoreFailure() throws Exception {
-               JobID jobId = new JobID();
-               ExecutionJobVertex jobVertex = mockExecutionJobVertex(jobId, 
new JobVertexID(), 4);
-               HeapSavepointStore savepointStore = spy(new 
HeapSavepointStore());
-
-               SavepointCoordinator coordinator = createSavepointCoordinator(
-                               jobId,
-                               60 * 1000,
-                               jobVertex.getTaskVertices(),
-                               jobVertex.getTaskVertices(),
-                               new ExecutionVertex[] {},
-                               new MockCheckpointIdCounter(),
-                               savepointStore);
-
-               // Failure on putState
-               doThrow(new RuntimeException("TestException"))
-                               
.when(savepointStore).storeSavepoint(any(Savepoint.class));
-
-               Future<String> savepointPathFuture = 
coordinator.triggerSavepoint(1231273123);
-
-               // Acknowledge all tasks
-               for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
-                       ExecutionAttemptID attemptId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
-                       coordinator.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(
-                                       jobId, attemptId, 0, 
createSerializedStateHandle(vertex), 0));
-               }
-
-               try {
-                       Await.result(savepointPathFuture, 
FiniteDuration.Zero());
-                       fail("Did not throw expected Exception after rollback 
with savepoint store failure.");
-               }
-               catch (Exception ignored) {
-               }
-
-               // Verify all promises removed
-               assertEquals(0, getSavepointPromises(coordinator).size());
-
-               coordinator.shutdown();
-       }
-
-       @Test
-       public void testAbortSavepointIfSubsumed() throws Exception {
-               JobID jobId = new JobID();
-               long checkpointTimeout = 60 * 1000;
-               long[] timestamps = new long[] { 1272635, 1272635 + 10 };
-               long[] checkpointIds = new long[2];
-               ExecutionVertex[] vertices = new ExecutionVertex[] {
-                               mockExecutionVertex(jobId),
-                               mockExecutionVertex(jobId) };
-               MockCheckpointIdCounter checkpointIdCounter = new 
MockCheckpointIdCounter();
-               HeapSavepointStore savepointStore = new HeapSavepointStore();
-
-               SavepointCoordinator coordinator = createSavepointCoordinator(
-                               jobId,
-                               checkpointTimeout,
-                               vertices,
-                               vertices,
-                               vertices,
-                               checkpointIdCounter,
-                               savepointStore);
-
-               // Trigger the savepoints
-               List<Future<String>> savepointPathFutures = new ArrayList<>();
-
-               
savepointPathFutures.add(coordinator.triggerSavepoint(timestamps[0]));
-               checkpointIds[0] = checkpointIdCounter.getLastReturnedCount();
-
-               
savepointPathFutures.add(coordinator.triggerSavepoint(timestamps[1]));
-               checkpointIds[1] = checkpointIdCounter.getLastReturnedCount();
-
-               for (Future<String> future : savepointPathFutures) {
-                       assertFalse(future.isCompleted());
-               }
-
-               // Verify send trigger messages
-               for (ExecutionVertex vertex : vertices) {
-                       verifyTriggerCheckpoint(vertex, checkpointIds[0], 
timestamps[0]);
-                       verifyTriggerCheckpoint(vertex, checkpointIds[1], 
timestamps[1]);
-               }
-
-               PendingCheckpoint[] pendingCheckpoints = new 
PendingCheckpoint[] {
-                               
coordinator.getPendingCheckpoints().get(checkpointIds[0]),
-                               
coordinator.getPendingCheckpoints().get(checkpointIds[1]) };
-
-               verifyPendingCheckpoint(pendingCheckpoints[0], jobId, 
checkpointIds[0],
-                               timestamps[0], 0, 2, 0, false, false);
-
-               verifyPendingCheckpoint(pendingCheckpoints[1], jobId, 
checkpointIds[1],
-                               timestamps[1], 0, 2, 0, false, false);
-
-               // Acknowledge second checkpoint...
-               for (ExecutionVertex vertex : vertices) {
-                       coordinator.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(
-                                       jobId, 
vertex.getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointIds[1], 
createSerializedStateHandle(vertex), 0));
-               }
-
-               // ...and one task of first checkpoint
-               coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(
-                               jobId, 
vertices[0].getCurrentExecutionAttempt().getAttemptId(),
-                               checkpointIds[0], 
createSerializedStateHandle(vertices[0]), 0));
-
-               // The second pending checkpoint is completed and subsumes the 
first one
-               assertTrue(pendingCheckpoints[0].isDiscarded());
-               assertTrue(pendingCheckpoints[1].isDiscarded());
-               assertEquals(0, coordinator.getSuccessfulCheckpoints().size());
-
-               // Verify send notify complete messages for second checkpoint
-               for (ExecutionVertex vertex : vertices) {
-                       verifyNotifyCheckpointComplete(vertex, 
checkpointIds[1], timestamps[1]);
-               }
-
-               Savepoint[] savepoints = new Savepoint[2];
-               String[] savepointPaths = new String[2];
-
-               // Verify that the futures have both been completed
-               assertTrue(savepointPathFutures.get(0).isCompleted());
-
-               try {
-                       savepointPaths[0] = 
Await.result(savepointPathFutures.get(0), FiniteDuration.Zero());
-                       fail("Did not throw expected exception");
-               }
-               catch (Exception ignored) {
-               }
-
-               // Verify the second savepoint
-               assertTrue(savepointPathFutures.get(1).isCompleted());
-               savepointPaths[1] = Await.result(savepointPathFutures.get(1), 
FiniteDuration.Zero());
-               savepoints[1] = savepointStore.loadSavepoint(savepointPaths[1]);
-               verifySavepoint(savepoints[1], checkpointIds[1], vertices);
-
-               // Verify all promises removed
-               assertEquals(0, getSavepointPromises(coordinator).size());
-
-               coordinator.shutdown();
-       }
-
-       @Test
-       public void 
testShutdownDoesNotCleanUpCompletedCheckpointsWithFileSystemStore() throws 
Exception {
-               JobID jobId = new JobID();
-               long checkpointTimeout = 60 * 1000;
-               long timestamp = 1272635;
-               ExecutionVertex[] vertices = new ExecutionVertex[] {
-                               mockExecutionVertex(jobId),
-                               mockExecutionVertex(jobId) };
-               MockCheckpointIdCounter checkpointIdCounter = new 
MockCheckpointIdCounter();
-
-               // Temporary directory for file state backend
-               final File tmpDir = CommonTestUtils.createTempDirectory();
-
-               try {
-                       FsSavepointStore savepointStore = new 
FsSavepointStore(tmpDir.toURI().toString(), "sp-");
-
-                       SavepointCoordinator coordinator = 
createSavepointCoordinator(
-                                       jobId,
-                                       checkpointTimeout,
-                                       vertices,
-                                       vertices,
-                                       vertices,
-                                       checkpointIdCounter,
-                                       savepointStore);
-
-                       // Trigger the savepoint
-                       Future<String> savepointPathFuture = 
coordinator.triggerSavepoint(timestamp);
-                       assertFalse(savepointPathFuture.isCompleted());
-
-                       long checkpointId = 
checkpointIdCounter.getLastReturnedCount();
-                       assertEquals(0, checkpointId);
-
-                       // Verify send trigger messages
-                       for (ExecutionVertex vertex : vertices) {
-                               verifyTriggerCheckpoint(vertex, checkpointId, 
timestamp);
-                       }
-
-                       PendingCheckpoint pendingCheckpoint = 
coordinator.getPendingCheckpoints()
-                                       .get(checkpointId);
-
-                       verifyPendingCheckpoint(pendingCheckpoint, jobId, 
checkpointId,
-                                       timestamp, 0, 2, 0, false, false);
-
-                       // Acknowledge tasks
-                       for (ExecutionVertex vertex : vertices) {
-                               coordinator.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(
-                                               jobId, 
vertex.getCurrentExecutionAttempt().getAttemptId(),
-                                               checkpointId, 
createSerializedStateHandle(vertex), 0));
-                       }
-
-                       // The pending checkpoint is completed
-                       assertTrue(pendingCheckpoint.isDiscarded());
-                       assertEquals(0, 
coordinator.getSuccessfulCheckpoints().size());
-
-                       // Verify send notify complete messages
-                       for (ExecutionVertex vertex : vertices) {
-                               verifyNotifyCheckpointComplete(vertex, 
checkpointId, timestamp);
-                       }
-
-                       // Verify that the future has been completed
-                       assertTrue(savepointPathFuture.isCompleted());
-                       String savepointPath = 
Await.result(savepointPathFuture, FiniteDuration.Zero());
-
-                       // Verify all promises removed
-                       assertEquals(0, 
getSavepointPromises(coordinator).size());
-
-                       coordinator.shutdown();
-
-                       // Verify the savepoint is still available
-                       Savepoint savepoint = 
savepointStore.loadSavepoint(savepointPath);
-                       verifySavepoint(savepoint, checkpointId, vertices);
-               }
-               finally {
-                       FileUtils.deleteDirectory(tmpDir);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       // Test helpers
-       // 
------------------------------------------------------------------------
-
-       private static SavepointCoordinator createSavepointCoordinator(
-                       JobID jobId,
-                       long checkpointTimeout,
-                       ExecutionVertex[] triggerVertices,
-                       ExecutionVertex[] ackVertices,
-                       ExecutionVertex[] commitVertices,
-                       CheckpointIDCounter checkpointIdCounter,
-                       SavepointStore savepointStore) throws Exception {
-
-               ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
-
-               return new SavepointCoordinator(
-                               jobId,
-                               checkpointTimeout,
-                               checkpointTimeout,
-                               42,
-                               triggerVertices,
-                               ackVertices,
-                               commitVertices,
-                               classLoader,
-                               checkpointIdCounter,
-                               savepointStore,
-                               new DisabledCheckpointStatsTracker());
-       }
-
-       private static Map<JobVertexID, ExecutionJobVertex> 
createExecutionJobVertexMap(
-                       ExecutionJobVertex... jobVertices) {
-
-               Map<JobVertexID, ExecutionJobVertex> jobVertexMap = new 
HashMap<>();
-
-               for (ExecutionJobVertex jobVertex : jobVertices) {
-                       jobVertexMap.put(jobVertex.getJobVertexId(), jobVertex);
-               }
-
-               return jobVertexMap;
-       }
-
-       private static SerializedValue<StateHandle<?>> 
createSerializedStateHandle(
-                       ExecutionVertex vertex) throws IOException {
-
-               return new SerializedValue<StateHandle<?>>(new 
LocalStateHandle<Serializable>(
-                               
vertex.getCurrentExecutionAttempt().getAttemptId()));
-       }
-
-       @SuppressWarnings("unchecked")
-       private Map<Long, Promise<String>> getSavepointPromises(
-                       SavepointCoordinator coordinator)
-                       throws NoSuchFieldException, IllegalAccessException {
-
-               Field field = 
SavepointCoordinator.class.getDeclaredField("savepointPromises");
-               field.setAccessible(true);
-               return (Map<Long, Promise<String>>) field.get(coordinator);
-       }
-
-       // ---- Verification 
------------------------------------------------------
-
-       private static void verifyTriggerCheckpoint(
-                       ExecutionVertex mockExecutionVertex,
-                       long expectedCheckpointId,
-                       long expectedTimestamp) {
-
-               ExecutionAttemptID attemptId = mockExecutionVertex
-                               .getCurrentExecutionAttempt().getAttemptId();
-
-               TriggerCheckpoint expectedMsg = new TriggerCheckpoint(
-                               mockExecutionVertex.getJobId(),
-                               attemptId,
-                               expectedCheckpointId,
-                               expectedTimestamp);
-
-               verify(mockExecutionVertex).sendMessageToCurrentExecution(
-                               eq(expectedMsg), eq(attemptId));
-       }
-
-       private static void verifyNotifyCheckpointComplete(
-                       ExecutionVertex mockExecutionVertex,
-                       long expectedCheckpointId,
-                       long expectedTimestamp) {
-
-               ExecutionAttemptID attemptId = mockExecutionVertex
-                               .getCurrentExecutionAttempt().getAttemptId();
-
-               NotifyCheckpointComplete expectedMsg = new 
NotifyCheckpointComplete(
-                               mockExecutionVertex.getJobId(),
-                               attemptId,
-                               expectedCheckpointId,
-                               expectedTimestamp);
-
-               verify(mockExecutionVertex).sendMessageToCurrentExecution(
-                               eq(expectedMsg), eq(attemptId));
-       }
-
-       private static void verifyPendingCheckpoint(
-                       PendingCheckpoint checkpoint,
-                       JobID expectedJobId,
-                       long expectedCheckpointId,
-                       long expectedTimestamp,
-                       int expectedNumberOfAcknowledgedTasks,
-                       int expectedNumberOfNonAcknowledgedTasks,
-                       int expectedNumberOfCollectedStates,
-                       boolean expectedIsDiscarded,
-                       boolean expectedIsFullyAcknowledged) {
-
-               assertNotNull(checkpoint);
-               assertEquals(expectedJobId, checkpoint.getJobId());
-               assertEquals(expectedCheckpointId, 
checkpoint.getCheckpointId());
-               assertEquals(expectedTimestamp, 
checkpoint.getCheckpointTimestamp());
-               assertEquals(expectedNumberOfAcknowledgedTasks, 
checkpoint.getNumberOfAcknowledgedTasks());
-               assertEquals(expectedNumberOfNonAcknowledgedTasks, 
checkpoint.getNumberOfNonAcknowledgedTasks());
-
-               int actualNumberOfCollectedStates = 0;
-
-               for (TaskState taskState : checkpoint.getTaskStates().values()) 
{
-                       actualNumberOfCollectedStates += 
taskState.getNumberCollectedStates();
-               }
-
-               assertEquals(expectedNumberOfCollectedStates, 
actualNumberOfCollectedStates);
-               assertEquals(expectedIsDiscarded, checkpoint.isDiscarded());
-               assertEquals(expectedIsFullyAcknowledged, 
checkpoint.isFullyAcknowledged());
-       }
-
-       private static void verifySavepoint(
-                       Savepoint savepoint,
-                       long expectedCheckpointId,
-                       ExecutionVertex[] expectedVertices) throws Exception {
-
-               assertEquals(expectedCheckpointId, savepoint.getCheckpointId());
-
-               for (TaskState taskState : savepoint.getTaskStates()) {
-                       JobVertexID jobVertexId = taskState.getJobVertexID();
-
-                       // Find matching execution vertex
-                       ExecutionVertex vertex = null;
-                       for (ExecutionVertex executionVertex : 
expectedVertices) {
-                               if 
(executionVertex.getJobvertexId().equals(jobVertexId)) {
-                                       vertex = executionVertex;
-                                       break;
-                               }
-                       }
-
-                       if (vertex == null) {
-                               fail("Did not find matching vertex");
-                       } else {
-                               SubtaskState subtaskState = 
taskState.getState(vertex.getParallelSubtaskIndex());
-                               ExecutionAttemptID vertexAttemptId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
-
-                               ExecutionAttemptID stateAttemptId = 
(ExecutionAttemptID) subtaskState.getState()
-                                               
.deserializeValue(Thread.currentThread().getContextClassLoader())
-                                               
.getState(Thread.currentThread().getContextClassLoader());
-
-                               assertEquals(vertexAttemptId, stateAttemptId);
-                       }
-               }
-       }
-
-       // ---- Mocking 
-----------------------------------------------------------
-
-       private static ExecutionJobVertex mockExecutionJobVertex(
-                       JobID jobId,
-                       JobVertexID jobVertexId,
-                       int parallelism) {
-
-               ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-               when(jobVertex.getJobId()).thenReturn(jobId);
-               when(jobVertex.getJobVertexId()).thenReturn(jobVertexId);
-               when(jobVertex.getParallelism()).thenReturn(parallelism);
-
-               ExecutionVertex[] vertices = new ExecutionVertex[parallelism];
-
-               for (int i = 0; i < vertices.length; i++) {
-                       vertices[i] = mockExecutionVertex(jobId, jobVertexId, 
i, parallelism, ExecutionState.RUNNING);
-               }
-
-               when(jobVertex.getTaskVertices()).thenReturn(vertices);
-
-               return jobVertex;
-       }
-
-       private static ExecutionVertex mockExecutionVertex(JobID jobId) {
-               return mockExecutionVertex(jobId, ExecutionState.RUNNING);
-       }
-
-       private static ExecutionVertex mockExecutionVertex(
-                       JobID jobId,
-                       ExecutionState state) {
-
-               return mockExecutionVertex(jobId, new JobVertexID(), 0, 1, 
state);
-       }
-
-       private static ExecutionVertex mockExecutionVertex(
-                       JobID jobId,
-                       JobVertexID jobVertexId,
-                       int subtaskIndex,
-                       int parallelism,
-                       ExecutionState executionState) {
-
-               Execution exec = mock(Execution.class);
-               when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
-               when(exec.getState()).thenReturn(executionState);
-
-               ExecutionVertex vertex = mock(ExecutionVertex.class);
-               when(vertex.getJobId()).thenReturn(jobId);
-               when(vertex.getJobvertexId()).thenReturn(jobVertexId);
-               when(vertex.getParallelSubtaskIndex()).thenReturn(subtaskIndex);
-               when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
-               
when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(parallelism);
-
-               return vertex;
-       }
-
-       private static class MockCheckpointIdCounter implements 
CheckpointIDCounter {
-
-               private boolean started;
-               private long count;
-               private long lastReturnedCount;
-
-               @Override
-               public void start() throws Exception {
-                       started = true;
-               }
-
-               @Override
-               public void shutdown() throws Exception {
-                       started = false;
-               }
-
-               @Override
-               public void suspend() throws Exception {
-                       started = false;
-               }
-
-               @Override
-               public long getAndIncrement() throws Exception {
-                       lastReturnedCount = count;
-                       return count++;
-               }
-
-               @Override
-               public void setCount(long newCount) {
-                       count = newCount;
-               }
-
-               long getLastReturnedCount() {
-                       return lastReturnedCount;
-               }
-
-               public boolean isStarted() {
-                       return started;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
new file mode 100644
index 0000000..6a85195
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SavepointLoaderTest {
+
+       /**
+        * Tests loading and validation of savepoints with correct setup,
+        * parallelism mismatch, and a missing task.
+        */
+       @Test
+       public void testLoadAndValidateSavepoint() throws Exception {
+               int parallelism = 128128;
+               JobVertexID vertexId = new JobVertexID();
+
+               TaskState state = mock(TaskState.class);
+               when(state.getParallelism()).thenReturn(parallelism);
+               when(state.getJobVertexID()).thenReturn(vertexId);
+
+               Map<JobVertexID, TaskState> taskStates = new HashMap<>();
+               taskStates.put(vertexId, state);
+
+               CompletedCheckpoint stored = new CompletedCheckpoint(
+                               new JobID(),
+                               Integer.MAX_VALUE + 123123L,
+                               10200202,
+                               1020292988,
+                               taskStates,
+                               true);
+
+               // Store savepoint
+               SavepointV0 savepoint = new 
SavepointV0(stored.getCheckpointID(), taskStates.values());
+               SavepointStore store = new HeapSavepointStore();
+               String path = store.storeSavepoint(savepoint);
+
+               JobID jobId = new JobID();
+
+               ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
+               when(vertex.getParallelism()).thenReturn(parallelism);
+
+               Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+               tasks.put(vertexId, vertex);
+
+               // 1) Load and validate: everything correct
+               CompletedCheckpoint loaded = 
SavepointLoader.loadAndValidateSavepoint(jobId, tasks, store, path);
+
+               assertEquals(jobId, loaded.getJobId());
+               assertEquals(stored.getCheckpointID(), 
loaded.getCheckpointID());
+
+               // The loaded checkpoint should not discard state when its 
discarded
+               loaded.discard(ClassLoader.getSystemClassLoader());
+               verify(state, times(0)).discard(any(ClassLoader.class));
+
+               // 2) Load and validate: parallelism mismatch
+               when(vertex.getParallelism()).thenReturn(222);
+
+               try {
+                       SavepointLoader.loadAndValidateSavepoint(jobId, tasks, 
store, path);
+                       fail("Did not throw expected Exception");
+               } catch (IllegalStateException expected) {
+                       assertTrue(expected.getMessage().contains("Parallelism 
mismatch"));
+               }
+
+               // 3) Load and validate: missing vertex (this should be relaxed)
+               assertNotNull(tasks.remove(vertexId));
+
+               try {
+                       SavepointLoader.loadAndValidateSavepoint(jobId, tasks, 
store, path);
+                       fail("Did not throw expected Exception");
+               } catch (IllegalStateException expected) {
+                       assertTrue(expected.getMessage().contains("Cannot map 
old state"));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index 9265ab1..12bbf82 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
@@ -340,8 +340,7 @@ public class SimpleCheckpointStatsTrackerTest {
                        // Add some random delay
                        final long completionTimestamp = triggerTimestamp + 
completionDuration + RAND.nextInt(10);
 
-                       checkpoints[i] = new CompletedCheckpoint(
-                                       jobId, i, triggerTimestamp, 
completionTimestamp, taskGroupStates);
+                       checkpoints[i] = new CompletedCheckpoint(jobId, i, 
triggerTimestamp, completionTimestamp, taskGroupStates, true);
                }
 
                return checkpoints;

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index a576a58..548bef0 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
 import akka.util.Timeout
-import org.apache.flink.api.common.{ExecutionConfig, JobID}
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator
 import org.apache.flink.runtime.client.JobExecutionException
@@ -31,10 +31,8 @@ import 
org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVert
 import org.apache.flink.runtime.jobmanager.Tasks._
 import 
org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, 
SlotSharingGroup}
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils
 import org.junit.runner.RunWith
 import org.mockito.Mockito._
 import org.scalatest.junit.JUnitRunner
@@ -829,14 +827,14 @@ class JobManagerITCase(_system: ActorSystem)
             deadline.timeLeft).executionGraph
 
           // Mock the checkpoint coordinator
-          val savepointCoordinator = mock(classOf[SavepointCoordinator])
+          val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
           doThrow(new Exception("Expected Test Exception"))
-            
.when(savepointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+            
.when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
 
           // Update the savepoint coordinator field
-          val field = 
executionGraph.getClass.getDeclaredField("savepointCoordinator")
+          val field = 
executionGraph.getClass.getDeclaredField("checkpointCoordinator")
           field.setAccessible(true)
-          field.set(executionGraph, savepointCoordinator)
+          field.set(executionGraph, checkpointCoordinator)
 
           // Trigger savepoint for job
           jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
@@ -877,10 +875,12 @@ class JobManagerITCase(_system: ActorSystem)
           expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
 
           // Mock the checkpoint coordinator
-          val savepointCoordinator = mock(classOf[SavepointCoordinator])
+          val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
+          doThrow(new Exception("Expected Test Exception"))
+            
.when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
           val savepointPathPromise = scala.concurrent.promise[String]
           doReturn(savepointPathPromise.future)
-            
.when(savepointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+            
.when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
 
           // Request the execution graph and set a checkpoint coordinator mock
           jobManager.tell(RequestExecutionGraph(jobGraph.getJobID), testActor)
@@ -888,9 +888,9 @@ class JobManagerITCase(_system: ActorSystem)
             deadline.timeLeft).executionGraph
 
           // Update the savepoint coordinator field
-          val field = 
executionGraph.getClass.getDeclaredField("savepointCoordinator")
+          val field = 
executionGraph.getClass.getDeclaredField("checkpointCoordinator")
           field.setAccessible(true)
-          field.set(executionGraph, savepointCoordinator)
+          field.set(executionGraph, checkpointCoordinator)
 
           // Trigger savepoint for job
           jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)
@@ -935,10 +935,12 @@ class JobManagerITCase(_system: ActorSystem)
           expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
 
           // Mock the checkpoint coordinator
-          val savepointCoordinator = mock(classOf[SavepointCoordinator])
+          val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
+          doThrow(new Exception("Expected Test Exception"))
+            
.when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
           val savepointPathPromise = scala.concurrent.promise[String]
           doReturn(savepointPathPromise.future)
-            
.when(savepointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
+            
.when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong())
 
           // Request the execution graph and set a checkpoint coordinator mock
           jobManager.tell(RequestExecutionGraph(jobGraph.getJobID), testActor)
@@ -946,9 +948,9 @@ class JobManagerITCase(_system: ActorSystem)
             deadline.timeLeft).executionGraph
 
           // Update the savepoint coordinator field
-          val field = 
executionGraph.getClass.getDeclaredField("savepointCoordinator")
+          val field = 
executionGraph.getClass.getDeclaredField("checkpointCoordinator")
           field.setAccessible(true)
-          field.set(executionGraph, savepointCoordinator)
+          field.set(executionGraph, checkpointCoordinator)
 
           // Trigger savepoint for job
           jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor)

http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 0ed28ad..4fc310c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -403,148 +403,6 @@ public class SavepointITCase extends TestLogger {
        }
 
        /**
-        * Tests that removed checkpoint files which are part of a savepoint 
throw
-        * a proper Exception on submission.
-        */
-       @Test
-       @RetryOnFailure(times = 2)
-       public void testCheckpointHasBeenRemoved() throws Exception {
-               // Config
-               int numTaskManagers = 2;
-               int numSlotsPerTaskManager = 2;
-               int parallelism = numTaskManagers * numSlotsPerTaskManager;
-
-               // Test deadline
-               final Deadline deadline = new FiniteDuration(5, 
TimeUnit.MINUTES).fromNow();
-
-               // The number of checkpoints to complete before triggering the 
savepoint
-               final int numberOfCompletedCheckpoints = 10;
-
-               // Temporary directory for file state backend
-               final File tmpDir = CommonTestUtils.createTempDirectory();
-
-               LOG.info("Created temporary directory: " + tmpDir + ".");
-
-               ForkableFlinkMiniCluster flink = null;
-
-               try {
-                       // Flink configuration
-                       final Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTaskManager);
-
-                       final File checkpointDir = new File(tmpDir, 
"checkpoints");
-                       final File savepointDir = new File(tmpDir, 
"savepoints");
-
-                       if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) {
-                               fail("Test setup failed: failed to create 
temporary directories.");
-                       }
-
-                       LOG.info("Created temporary checkpoint directory: " + 
checkpointDir + ".");
-                       LOG.info("Created temporary savepoint directory: " + 
savepointDir + ".");
-
-                       config.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
-                       
config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-
-                       
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
-                                       checkpointDir.toURI().toString());
-                       
config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY,
-                                       savepointDir.toURI().toString());
-
-                       LOG.info("Flink configuration: " + config + ".");
-
-                       // Start Flink
-                       flink = new ForkableFlinkMiniCluster(config);
-                       LOG.info("Starting Flink cluster.");
-                       flink.start();
-
-                       // Retrieve the job manager
-                       LOG.info("Retrieving JobManager.");
-                       ActorGateway jobManager = Await.result(
-                                       flink.leaderGateway().future(),
-                                       deadline.timeLeft());
-                       LOG.info("JobManager: " + jobManager + ".");
-
-                       // Submit the job
-                       final JobGraph jobGraph = createJobGraph(parallelism, 
0, 1000, 1000);
-                       final JobID jobId = jobGraph.getJobID();
-
-                       // Wait for the source to be notified about the 
expected number
-                       // of completed checkpoints
-                       InfiniteTestSource.CheckpointCompleteLatch = new 
CountDownLatch(
-                                       numberOfCompletedCheckpoints);
-
-                       LOG.info("Submitting job " + jobGraph.getJobID() + " in 
detached mode.");
-
-                       flink.submitJobDetached(jobGraph);
-
-                       LOG.info("Waiting for " + numberOfCompletedCheckpoints +
-                                       " checkpoint complete notifications.");
-
-                       // Wait...
-                       InfiniteTestSource.CheckpointCompleteLatch.await();
-
-                       LOG.info("Received all " + numberOfCompletedCheckpoints 
+
-                                       " checkpoint complete notifications.");
-
-                       // ...and then trigger the savepoint
-                       LOG.info("Triggering a savepoint.");
-
-                       Future<Object> savepointPathFuture = jobManager.ask(
-                                       new TriggerSavepoint(jobId), 
deadline.timeLeft());
-
-                       final String savepointPath = ((TriggerSavepointSuccess) 
Await
-                                       .result(savepointPathFuture, 
deadline.timeLeft())).savepointPath();
-                       LOG.info("Retrieved savepoint path: " + savepointPath + 
".");
-
-                       // Retrieve the savepoint from the testing job manager
-                       LOG.info("Requesting the savepoint.");
-                       Future<Object> savepointFuture = jobManager.ask(
-                                       new RequestSavepoint(savepointPath),
-                                       deadline.timeLeft());
-
-                       Await.ready(savepointFuture, deadline.timeLeft());
-                       LOG.info("Retrieved savepoint: " + savepointPath + ".");
-
-                       // Shut down the Flink cluster (thereby canceling the 
job)
-                       LOG.info("Shutting down Flink cluster.");
-                       flink.shutdown();
-
-                       // Remove the checkpoint files
-                       try {
-                               FileUtils.deleteDirectory(checkpointDir);
-                       } catch (FileNotFoundException ignored) {
-                       }
-
-                       // Restart the cluster
-                       LOG.info("Restarting Flink cluster.");
-                       flink.start();
-
-                       // Set the savepoint path
-                       jobGraph.setSavepointPath(savepointPath);
-
-                       LOG.info("Resubmitting job " + jobGraph.getJobID() + " 
with " +
-                                       "savepoint path " + savepointPath + " 
in detached mode.");
-
-                       try {
-                               flink.submitJobAndWait(jobGraph, false, 
deadline.timeLeft());
-                               fail("Did not throw expected Exception because 
of missing checkpoint files");
-                       }
-                       catch (Exception ignored) {
-                       }
-               }
-               finally {
-                       if (flink != null) {
-                               flink.shutdown();
-                       }
-
-                       if (tmpDir != null) {
-                               FileUtils.deleteDirectory(tmpDir);
-                       }
-               }
-       }
-
-       /**
         * Tests that a job manager backed savepoint is removed when the 
checkpoint
         * coordinator is shut down, because the associated checkpoints files 
will
         * linger around otherwise.
@@ -645,40 +503,42 @@ public class SavepointITCase extends TestLogger {
                                        savepointFuture, 
deadline.timeLeft())).savepoint();
                        LOG.info("Retrieved savepoint: " + savepointPath + ".");
 
-                       // Cancel the job
-                       LOG.info("Cancelling job " + jobId + ".");
-                       Future<Object> cancelRespFuture = jobManager.ask(
-                                       new CancelJob(jobId), 
deadline.timeLeft());
-                       assertTrue(Await.result(cancelRespFuture, 
deadline.timeLeft())
-                                       instanceof CancellationSuccess);
-
-                       LOG.info("Waiting for job " + jobId + " to be 
removed.");
-                       Future<Object> removedRespFuture = jobManager.ask(
-                                       new NotifyWhenJobRemoved(jobId), 
deadline.timeLeft());
-                       assertTrue((Boolean) Await.result(removedRespFuture, 
deadline.timeLeft()));
-
                        // Check that all checkpoint files have been removed
                        for (TaskState stateForTaskGroup : 
savepoint.getTaskStates()) {
                                for (SubtaskState subtaskState : 
stateForTaskGroup.getStates()) {
                                        StreamTaskStateList taskStateList = 
(StreamTaskStateList) subtaskState.getState()
-                                               
.deserializeValue(ClassLoader.getSystemClassLoader());
+                                                       
.deserializeValue(ClassLoader.getSystemClassLoader());
 
                                        for (StreamTaskState taskState : 
taskStateList.getState(
-                                               
ClassLoader.getSystemClassLoader())) {
+                                                       
ClassLoader.getSystemClassLoader())) {
 
                                                AbstractFileStateHandle fsState 
= (AbstractFileStateHandle) taskState.getFunctionState();
                                                checkpointFiles.add(new 
File(fsState.getFilePath().toUri()));
                                        }
                                }
                        }
+
+                       // Cancel the job
+                       LOG.info("Cancelling job " + jobId + ".");
+                       Future<Object> cancelRespFuture = jobManager.ask(
+                                       new CancelJob(jobId), 
deadline.timeLeft());
+                       assertTrue(Await.result(cancelRespFuture, 
deadline.timeLeft())
+                                       instanceof CancellationSuccess);
+
+                       LOG.info("Waiting for job " + jobId + " to be 
removed.");
+                       Future<Object> removedRespFuture = jobManager.ask(
+                                       new NotifyWhenJobRemoved(jobId), 
deadline.timeLeft());
+                       assertTrue((Boolean) Await.result(removedRespFuture, 
deadline.timeLeft()));
                }
                finally {
                        if (flink != null) {
                                flink.shutdown();
                        }
 
+                       Thread.sleep(1000);
+
                        // At least one checkpoint file
-                       assertTrue(checkpointFiles.size() > 0);
+                       assertTrue(checkpointFiles.toString(), 
checkpointFiles.size() > 0);
 
                        // The checkpoint associated with the savepoint should 
have been
                        // discarded after shutdown
@@ -753,121 +613,6 @@ public class SavepointITCase extends TestLogger {
                }
        }
 
-       /**
-        * Tests that a restore failure is retried with the savepoint state.
-        */
-       @Test
-       public void testRestoreFailure() throws Exception {
-               // Config
-               int numTaskManagers = 1;
-               int numSlotsPerTaskManager = 1;
-               int numExecutionRetries = 2;
-               int retryDelay = 500;
-               int checkpointingInterval = 100000000;
-
-               // Test deadline
-               final Deadline deadline = new FiniteDuration(3, 
TimeUnit.MINUTES).fromNow();
-
-               ForkableFlinkMiniCluster flink = null;
-
-               try {
-                       // The job
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-                       env.setParallelism(1);
-                       env.enableCheckpointing(checkpointingInterval);
-                       env.setNumberOfExecutionRetries(numExecutionRetries);
-                       env.getConfig().setExecutionRetryDelay(retryDelay);
-
-                       DataStream<Integer> stream = env
-                                       .addSource(new 
RestoreStateCountingAndFailingSource());
-
-                       // Source configuration
-                       
RestoreStateCountingAndFailingSource.failOnRestoreStateCall = false;
-                       
RestoreStateCountingAndFailingSource.numRestoreStateCalls = 0;
-                       
RestoreStateCountingAndFailingSource.checkpointCompleteLatch = new 
CountDownLatch(1);
-                       RestoreStateCountingAndFailingSource.emitted= 0;
-
-                       stream.addSink(new DiscardingSink<Integer>());
-
-                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-
-                       // Flink configuration
-                       final Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTaskManager);
-                       LOG.info("Flink configuration: " + config + ".");
-
-                       // Start Flink
-                       flink = new ForkableFlinkMiniCluster(config);
-                       LOG.info("Starting Flink cluster.");
-                       flink.start();
-
-                       // Retrieve the job manager
-                       LOG.info("Retrieving JobManager.");
-                       ActorGateway jobManager = 
flink.getLeaderGateway(deadline.timeLeft());
-                       LOG.info("JobManager: " + jobManager + ".");
-
-                       // Submit the job and wait for some checkpoints to 
complete
-                       flink.submitJobDetached(jobGraph);
-
-                       while (deadline.hasTimeLeft() && 
RestoreStateCountingAndFailingSource.emitted < 100) {
-                               Thread.sleep(100);
-                       }
-
-                       assertTrue("No progress", 
RestoreStateCountingAndFailingSource.emitted >= 100);
-
-                       // Trigger the savepoint
-                       Future<Object> savepointPathFuture = jobManager.ask(
-                                       new 
TriggerSavepoint(jobGraph.getJobID()), deadline.timeLeft());
-
-                       Object resp = Await.result(savepointPathFuture, 
deadline.timeLeft());
-
-                       String savepointPath = null;
-                       if (resp instanceof TriggerSavepointSuccess) {
-                               savepointPath = ((TriggerSavepointSuccess) 
resp).savepointPath();
-                               LOG.info("Retrieved savepoint path: " + 
savepointPath + ".");
-                       } else if (resp instanceof TriggerSavepointFailure) {
-                               fail("Received TriggerSavepointFailure: " + 
((TriggerSavepointFailure) resp).cause().getMessage());
-                       } else {
-                               fail("Unexpected response of type  " + 
resp.getClass() + " " + resp);
-                       }
-
-                       // Completed checkpoint
-                       
RestoreStateCountingAndFailingSource.checkpointCompleteLatch.await();
-
-                       // Cancel the job
-                       Future<?> cancelFuture = jobManager.ask(new CancelJob(
-                                       jobGraph.getJobID()), 
deadline.timeLeft());
-                       Await.ready(cancelFuture, deadline.timeLeft());
-
-                       // Wait for the job to be removed
-                       Future<?> removedFuture = jobManager.ask(new 
NotifyWhenJobRemoved(
-                                       jobGraph.getJobID()), 
deadline.timeLeft());
-                       Await.ready(removedFuture, deadline.timeLeft());
-
-                       // Set source to fail on restore calls and try to 
recover from savepoint
-                       
RestoreStateCountingAndFailingSource.failOnRestoreStateCall = true;
-                       jobGraph.setSavepointPath(savepointPath);
-
-                       try {
-                               flink.submitJobAndWait(jobGraph, false, 
deadline.timeLeft());
-                               // If the savepoint state is not restored, we 
will wait here
-                               // until the deadline times out.
-                               fail("Did not throw expected Exception");
-                       } catch (Exception ignored) {
-                       } finally {
-                               // Expecting one restore for the initial 
submission from
-                               // savepoint and one for the execution retries
-                               assertEquals(1 + numExecutionRetries, 
RestoreStateCountingAndFailingSource.numRestoreStateCalls);
-                       }
-               }
-               finally {
-                       if (flink != null) {
-                               flink.shutdown();
-                       }
-               }
-       }
-
        // 
------------------------------------------------------------------------
        // Test program
        // 
------------------------------------------------------------------------
@@ -982,6 +727,10 @@ public class SavepointITCase extends TestLogger {
                        while (running) {
                                ctx.collect(1);
                                emitted++;
+
+                               if (failOnRestoreStateCall) {
+                                       throw new RuntimeException("Restore 
test failure");
+                               }
                        }
                }
 
@@ -998,10 +747,6 @@ public class SavepointITCase extends TestLogger {
                @Override
                public void restoreState(Serializable state) throws Exception {
                        numRestoreStateCalls++;
-
-                       if (failOnRestoreStateCall) {
-                               throw new RuntimeException("Restore test 
failure");
-                       }
                }
 
                @Override

Reply via email to