http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index c2ada3b..d8e46fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -313,8 +313,8 @@ public class CheckpointCoordinatorTest {
                        assertFalse(checkpoint.isFullyAcknowledged());
 
                        // check that the vertices received the trigger 
checkpoint message
-                       
verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, 
timestamp);
-                       
verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, 
timestamp);
+                       
verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, 
timestamp, CheckpointOptions.forFullCheckpoint());
+                       
verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, 
timestamp, CheckpointOptions.forFullCheckpoint());
 
                        CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, 0L);
 
@@ -428,14 +428,14 @@ public class CheckpointCoordinatorTest {
 
                        // check that the vertices received the trigger 
checkpoint message
                        {
-                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp));
-                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp));
+                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), 
any(CheckpointOptions.class));
+                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), 
any(CheckpointOptions.class));
                        }
 
                        // check that the vertices received the trigger 
checkpoint message for the second checkpoint
                        {
-                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2));
-                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2));
+                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), 
any(CheckpointOptions.class));
+                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), 
any(CheckpointOptions.class));
                        }
 
                        // decline checkpoint from one of the tasks, this 
should cancel the checkpoint
@@ -529,8 +529,8 @@ public class CheckpointCoordinatorTest {
 
                        // check that the vertices received the trigger 
checkpoint message
                        {
-                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
-                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
+                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), 
any(CheckpointOptions.class));
+                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), 
any(CheckpointOptions.class));
                        }
 
                        // acknowledge from one of the tasks
@@ -558,8 +558,8 @@ public class CheckpointCoordinatorTest {
 
                        // validate that the relevant tasks got a confirmation 
message
                        {
-                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
-                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
+                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), 
any(CheckpointOptions.class));
+                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), 
any(CheckpointOptions.class));
                        }
 
                        CompletedCheckpoint success = 
coord.getSuccessfulCheckpoints().get(0);
@@ -589,8 +589,8 @@ public class CheckpointCoordinatorTest {
 
                        // validate that the relevant tasks got a confirmation 
message
                        {
-                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
-                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
+                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), 
any(CheckpointOptions.class));
+                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), 
any(CheckpointOptions.class));
 
                                verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
                                verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
@@ -660,8 +660,8 @@ public class CheckpointCoordinatorTest {
                        long checkpointId1 = pending1.getCheckpointId();
 
                        // trigger messages should have been sent
-                       verify(triggerVertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
-                       verify(triggerVertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
+                       verify(triggerVertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), 
any(CheckpointOptions.class));
+                       verify(triggerVertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), 
any(CheckpointOptions.class));
 
                        CheckpointMetaData checkpointMetaData1 = new 
CheckpointMetaData(checkpointId1, 0L);
 
@@ -687,8 +687,8 @@ public class CheckpointCoordinatorTest {
                        CheckpointMetaData checkpointMetaData2 = new 
CheckpointMetaData(checkpointId2, 0L);
 
                        // trigger messages should have been sent
-                       verify(triggerVertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
-                       verify(triggerVertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
+                       verify(triggerVertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), 
any(CheckpointOptions.class));
+                       verify(triggerVertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), 
any(CheckpointOptions.class));
 
                        // we acknowledge the remaining two tasks from the first
                        // checkpoint and two tasks from the second checkpoint
@@ -794,8 +794,8 @@ public class CheckpointCoordinatorTest {
                        long checkpointId1 = pending1.getCheckpointId();
 
                        // trigger messages should have been sent
-                       verify(triggerVertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
-                       verify(triggerVertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
+                       verify(triggerVertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), 
any(CheckpointOptions.class));
+                       verify(triggerVertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), 
any(CheckpointOptions.class));
 
                        CheckpointMetaData checkpointMetaData1 = new 
CheckpointMetaData(checkpointId1, 0L);
 
@@ -819,8 +819,8 @@ public class CheckpointCoordinatorTest {
                        long checkpointId2 = pending2.getCheckpointId();
 
                        // trigger messages should have been sent
-                       verify(triggerVertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
-                       verify(triggerVertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
+                       verify(triggerVertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), 
any(CheckpointOptions.class));
+                       verify(triggerVertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), 
any(CheckpointOptions.class));
 
                        // we acknowledge one more task from the first 
checkpoint and the second
                        // checkpoint completely. The second checkpoint should 
then subsume the first checkpoint
@@ -1142,7 +1142,7 @@ public class CheckpointCoordinatorTest {
                                        numCalls.incrementAndGet();
                                        return null;
                                }
-                       }).when(execution).triggerCheckpoint(anyLong(), 
anyLong());
+                       }).when(execution).triggerCheckpoint(anyLong(), 
anyLong(), any(CheckpointOptions.class));
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
                                jid,
@@ -1232,7 +1232,7 @@ public class CheckpointCoordinatorTest {
                                triggerCalls.add((Long) 
invocation.getArguments()[0]);
                                return null;
                        }
-               }).when(executionAttempt).triggerCheckpoint(anyLong(), 
anyLong());
+               }).when(executionAttempt).triggerCheckpoint(anyLong(), 
anyLong(), any(CheckpointOptions.class));
 
                final long delay = 50;
 
@@ -1398,7 +1398,6 @@ public class CheckpointCoordinatorTest {
                assertFalse(savepointFuture.isDone());
 
                long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-               CheckpointMetaData checkpointMetaDataNew = new 
CheckpointMetaData(checkpointIdNew, 0L);
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointIdNew));
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointIdNew));
 
@@ -1414,8 +1413,8 @@ public class CheckpointCoordinatorTest {
 
                // validate that the relevant tasks got a confirmation message
                {
-                       verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
-                       verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
+                       verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), 
any(CheckpointOptions.class));
+                       verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), 
any(CheckpointOptions.class));
 
                        verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
                        verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
@@ -1537,7 +1536,7 @@ public class CheckpointCoordinatorTest {
                                        numCalls.incrementAndGet();
                                        return null;
                                }
-                       }).when(execution).triggerCheckpoint(anyLong(), 
anyLong());
+                       }).when(execution).triggerCheckpoint(anyLong(), 
anyLong(), any(CheckpointOptions.class));
 
                        doAnswer(new Answer<Void>() {
                                @Override
@@ -1578,7 +1577,7 @@ public class CheckpointCoordinatorTest {
                        assertEquals(maxConcurrentAttempts, numCalls.get());
 
                        verify(triggerVertex.getCurrentExecutionAttempt(), 
times(maxConcurrentAttempts))
-                                       .triggerCheckpoint(anyLong(), 
anyLong());
+                                       .triggerCheckpoint(anyLong(), 
anyLong(), any(CheckpointOptions.class));
 
                        // now, once we acknowledge one checkpoint, it should 
trigger the next one
                        coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID, 1L));

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
new file mode 100644
index 0000000..6788338
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
+import org.junit.Test;
+
+public class CheckpointOptionsTest {
+
+       @Test
+       public void testFullCheckpoint() throws Exception {
+               CheckpointOptions options = 
CheckpointOptions.forFullCheckpoint();
+               assertEquals(CheckpointType.FULL_CHECKPOINT, 
options.getCheckpointType());
+               assertNull(options.getTargetLocation());
+       }
+
+       @Test
+       public void testSavepoint() throws Exception {
+               String location = 
"asdasdadasdasdja7931481398123123123kjhasdkajsd";
+               CheckpointOptions options = 
CheckpointOptions.forSavepoint(location);
+               assertEquals(CheckpointType.SAVEPOINT, 
options.getCheckpointType());
+               assertEquals(location, options.getTargetLocation());
+       }
+
+       @Test(expected = NullPointerException.class)
+       public void testSavepointNullCheck() throws Exception {
+               CheckpointOptions.forSavepoint(null);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
index 3c373f1..95a31d4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
@@ -184,6 +184,7 @@ public class CheckpointStatsHistoryTest {
                
when(completed.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
                
when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
                when(completed.getCheckpointId()).thenReturn(checkpointId);
+               
when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
                return completed;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
index 512768d..6ab8620 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
@@ -183,7 +183,7 @@ public class MigrationV0ToV1Test {
 
                } finally {
                        // Dispose
-                       SavepointStore.removeSavepoint(path.toString());
+                       SavepointStore.removeSavepointFile(path.toString());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 6471d6f..c66b29d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -64,12 +64,12 @@ public class SavepointLoaderTest {
                Map<JobVertexID, TaskState> taskStates = new HashMap<>();
                taskStates.put(vertexId, state);
 
+               JobID jobId = new JobID();
+
                // Store savepoint
                SavepointV1 savepoint = new SavepointV1(checkpointId, 
taskStates.values());
                String path = 
SavepointStore.storeSavepoint(tmp.getAbsolutePath(), savepoint);
 
-               JobID jobId = new JobID();
-
                ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
                when(vertex.getParallelism()).thenReturn(parallelism);
                when(vertex.getMaxParallelism()).thenReturn(parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
index 3398341..dc19e47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import java.io.File;
+import java.util.Arrays;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -38,6 +41,7 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -54,14 +58,22 @@ public class SavepointStoreTest {
         */
        @Test
        public void testStoreLoadDispose() throws Exception {
-               String target = tmp.getRoot().getAbsolutePath();
+               String root = tmp.getRoot().getAbsolutePath();
+               File rootFile = new File(root);
 
-               assertEquals(0, tmp.getRoot().listFiles().length);
+               File[] list = rootFile.listFiles();
+
+               assertNotNull(list);
+               assertEquals(0, list.length);
 
                // Store
+               String savepointDirectory = 
SavepointStore.createSavepointDirectory(root, new JobID());
                SavepointV1 stored = new SavepointV1(1929292, 
SavepointV1Test.createTaskStates(4, 24));
-               String path = SavepointStore.storeSavepoint(target, stored);
-               assertEquals(1, tmp.getRoot().listFiles().length);
+               String path = SavepointStore.storeSavepoint(savepointDirectory, 
stored);
+
+               list = rootFile.listFiles();
+               assertNotNull(list);
+               assertEquals(1, list.length);
 
                // Load
                Savepoint loaded = SavepointStore.loadSavepoint(path, 
Thread.currentThread().getContextClassLoader());
@@ -70,9 +82,11 @@ public class SavepointStoreTest {
                loaded.dispose();
 
                // Dispose
-               SavepointStore.removeSavepoint(path);
+               SavepointStore.deleteSavepointDirectory(path);
 
-               assertEquals(0, tmp.getRoot().listFiles().length);
+               list = rootFile.listFiles();
+               assertNotNull(list);
+               assertEquals(0, list.length);
        }
 
        /**
@@ -108,8 +122,8 @@ public class SavepointStoreTest {
 
                assertTrue(serializers.size() >= 1);
 
-               String target = tmp.getRoot().getAbsolutePath();
-               assertEquals(0, tmp.getRoot().listFiles().length);
+               String root = tmp.getRoot().getAbsolutePath();
+               File rootFile = new File(root);
 
                // New savepoint type for test
                int version = ThreadLocalRandom.current().nextInt();
@@ -118,14 +132,24 @@ public class SavepointStoreTest {
                // Add serializer
                serializers.put(version, NewSavepointSerializer.INSTANCE);
 
+               String savepointDirectory1 = 
SavepointStore.createSavepointDirectory(root, new JobID());
                TestSavepoint newSavepoint = new TestSavepoint(version, 
checkpointId);
-               String pathNewSavepoint = SavepointStore.storeSavepoint(target, 
newSavepoint);
-               assertEquals(1, tmp.getRoot().listFiles().length);
+               String pathNewSavepoint = 
SavepointStore.storeSavepoint(savepointDirectory1, newSavepoint);
+
+               File[] list = rootFile.listFiles();
+
+               assertNotNull(list);
+               assertEquals(1, list.length);
 
                // Savepoint v0
+               String savepointDirectory2 = 
SavepointStore.createSavepointDirectory(root, new JobID());
                Savepoint savepoint = new SavepointV1(checkpointId, 
SavepointV1Test.createTaskStates(4, 32));
-               String pathSavepoint = SavepointStore.storeSavepoint(target, 
savepoint);
-               assertEquals(2, tmp.getRoot().listFiles().length);
+               String pathSavepoint = 
SavepointStore.storeSavepoint(savepointDirectory2, savepoint);
+
+               list = rootFile.listFiles();
+
+               assertNotNull(list);
+               assertEquals(2, list.length);
 
                // Load
                Savepoint loaded = 
SavepointStore.loadSavepoint(pathNewSavepoint, 
Thread.currentThread().getContextClassLoader());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
new file mode 100644
index 0000000..dd5b0b6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.junit.Test;
+
+public class CheckpointBarrierTest {
+
+       /**
+        * Test serialization of the checkpoint barrier.
+        */
+       @Test
+       public void testSerialization() throws Exception {
+               long id = Integer.MAX_VALUE + 123123L;
+               long timestamp = Integer.MAX_VALUE + 1228L;
+
+               CheckpointOptions checkpoint = 
CheckpointOptions.forFullCheckpoint();
+               testSerialization(id, timestamp, checkpoint);
+
+               CheckpointOptions savepoint = 
CheckpointOptions.forSavepoint("1289031838919123");
+               testSerialization(id, timestamp, savepoint);
+       }
+
+       private void testSerialization(long id, long timestamp, 
CheckpointOptions options) throws IOException {
+               CheckpointBarrier barrier = new CheckpointBarrier(id, 
timestamp, options);
+
+               DataOutputSerializer out = new DataOutputSerializer(1024);
+               barrier.write(out);
+
+               DataInputDeserializer in = new 
DataInputDeserializer(out.wrapAsByteBuffer());
+               CheckpointBarrier deserialized = new CheckpointBarrier();
+               deserialized.read(in);
+
+               assertEquals(id, deserialized.getId());
+               assertEquals(timestamp, deserialized.getTimestamp());
+               assertEquals(options.getCheckpointType(), 
deserialized.getCheckpointOptions().getCheckpointType());
+               assertEquals(options.getTargetLocation(), 
deserialized.getCheckpointOptions().getTargetLocation());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 271d0d2..e674eb7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -18,6 +18,14 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -25,25 +33,42 @@ import 
org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
-
 import org.junit.Test;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
+public class EventSerializerTest {
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+       @Test
+       public void testCheckpointBarrierSerialization() throws Exception {
+               long id = Integer.MAX_VALUE + 123123L;
+               long timestamp = Integer.MAX_VALUE + 1228L;
 
-public class EventSerializerTest {
+               CheckpointOptions checkpoint = 
CheckpointOptions.forFullCheckpoint();
+               testCheckpointBarrierSerialization(id, timestamp, checkpoint);
+
+               CheckpointOptions savepoint = 
CheckpointOptions.forSavepoint("1289031838919123");
+               testCheckpointBarrierSerialization(id, timestamp, savepoint);
+       }
+
+       private void testCheckpointBarrierSerialization(long id, long 
timestamp, CheckpointOptions options) throws IOException {
+               ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
+               CheckpointBarrier barrier = new CheckpointBarrier(id, 
timestamp, options);
+               ByteBuffer serialized = 
EventSerializer.toSerializedEvent(barrier);
+               CheckpointBarrier deserialized = (CheckpointBarrier) 
EventSerializer.fromSerializedEvent(serialized, cl);
+               assertFalse(serialized.hasRemaining());
+
+               assertEquals(id, deserialized.getId());
+               assertEquals(timestamp, deserialized.getTimestamp());
+               assertEquals(options.getCheckpointType(), 
deserialized.getCheckpointOptions().getCheckpointType());
+               assertEquals(options.getTargetLocation(), 
deserialized.getCheckpointOptions().getTargetLocation());
+       }
 
        @Test
        public void testSerializeDeserializeEvent() throws Exception {
                AbstractEvent[] events = {
                                EndOfPartitionEvent.INSTANCE,
                                EndOfSuperstepEvent.INSTANCE,
-                               new CheckpointBarrier(1678L, 4623784L),
+                               new CheckpointBarrier(1678L, 4623784L, 
CheckpointOptions.forFullCheckpoint()),
                                new TestTaskEvent(Math.random(), 12361231273L),
                                new CancelCheckpointMarker(287087987329842L)
                };
@@ -94,7 +119,7 @@ public class EventSerializerTest {
                AbstractEvent[] events = {
                        EndOfPartitionEvent.INSTANCE,
                        EndOfSuperstepEvent.INSTANCE,
-                       new CheckpointBarrier(1678L, 4623784L),
+                       new CheckpointBarrier(1678L, 4623784L, 
CheckpointOptions.forFullCheckpoint()),
                        new TestTaskEvent(Math.random(), 12361231273L),
                        new CancelCheckpointMarker(287087987329842L)
                };

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 63175ed..900b5c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -327,7 +328,7 @@ public class RecordWriterTest {
 
                ResultPartitionWriter partitionWriter = 
createCollectingPartitionWriter(queues, bufferProvider);
                RecordWriter<ByteArrayIO> writer = new 
RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
-               CheckpointBarrier barrier = new 
CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L);
+               CheckpointBarrier barrier = new 
CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, 
CheckpointOptions.forFullCheckpoint());
 
                // No records emitted yet, broadcast should not request a buffer
                writer.broadcastEvent(barrier);
@@ -363,7 +364,7 @@ public class RecordWriterTest {
 
                ResultPartitionWriter partitionWriter = 
createCollectingPartitionWriter(queues, bufferProvider);
                RecordWriter<ByteArrayIO> writer = new 
RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
-               CheckpointBarrier barrier = new 
CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L);
+               CheckpointBarrier barrier = new 
CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, 
CheckpointOptions.forFullCheckpoint());
 
                // Emit records on some channels first (requesting buffers), 
then
                // broadcast the event. The record buffers should be emitted 
first, then

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index de54d1f..5a38be2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -601,7 +602,7 @@ public class JobManagerHARecoveryTest {
                }
 
                @Override
-               public boolean triggerCheckpoint(CheckpointMetaData 
checkpointMetaData) throws Exception {
+               public boolean triggerCheckpoint(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
                        ByteStreamStateHandle byteStreamStateHandle = new 
TestByteStreamStateHandleDeepCompare(
                                        String.valueOf(UUID.randomUUID()),
                                        
InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
@@ -619,7 +620,7 @@ public class JobManagerHARecoveryTest {
                }
 
                @Override
-               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
                        throw new UnsupportedOperationException("should not be 
called!");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index db45231..bc420cc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -49,7 +50,7 @@ public class CheckpointMessagesTest {
                        NotifyCheckpointComplete cc = new 
NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 
45287698767345L, 467L);
                        testSerializabilityEqualsHashCode(cc);
 
-                       TriggerCheckpoint tc = new TriggerCheckpoint(new 
JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);
+                       TriggerCheckpoint tc = new TriggerCheckpoint(new 
JobID(), new ExecutionAttemptID(), 347652734L, 7576752L, 
CheckpointOptions.forFullCheckpoint());
                        testSerializabilityEqualsHashCode(tc);
 
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 5bd085f..94df524 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.junit.Test;
@@ -165,7 +166,7 @@ public class OperatorStateBackendTest {
                listState3.add(20);
 
                CheckpointStreamFactory streamFactory = 
abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
-               OperatorStateHandle stateHandle = 
operatorStateBackend.snapshot(1, 1, streamFactory).get();
+               OperatorStateHandle stateHandle = 
operatorStateBackend.snapshot(1, 1, streamFactory, 
CheckpointOptions.forFullCheckpoint()).get();
 
                try {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 3b0350d..f2416b9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -191,7 +192,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                assertEquals("1", getSerializedValue(kvState, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
                // draw a snapshot
-               KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+               KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                // make some more modifications
                backend.setCurrentKey(1);
@@ -202,7 +203,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                state.update("u3");
 
                // draw another snapshot
-               KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+               KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                // validate the original state
                backend.setCurrentKey(1);
@@ -403,7 +404,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                assertEquals(13, (int) state2.value());
 
                // draw a snapshot
-               KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+               KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                backend.dispose();
                backend = restoreKeyedBackend(
@@ -476,7 +477,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                assertEquals(42L, (long) state.value());
 
                // draw a snapshot
-               KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+               KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                backend.dispose();
                backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot1);
@@ -521,7 +522,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        assertEquals("1", 
joiner.join(getSerializedList(kvState, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                        // make some more modifications
                        backend.setCurrentKey(1);
@@ -532,7 +533,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.add("u3");
 
                        // draw another snapshot
-                       KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+                       KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                        // validate the original state
                        backend.setCurrentKey(1);
@@ -620,7 +621,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        assertEquals("1", getSerializedValue(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                        // make some more modifications
                        backend.setCurrentKey(1);
@@ -631,7 +632,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.add("u3");
 
                        // draw another snapshot
-                       KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+                       KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                        // validate the original state
                        backend.setCurrentKey(1);
@@ -722,7 +723,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        assertEquals("Fold-Initial:,1", 
getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                        // make some more modifications
                        backend.setCurrentKey(1);
@@ -734,7 +735,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.add(103);
 
                        // draw another snapshot
-                       KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+                       KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                        // validate the original state
                        backend.setCurrentKey(1);
@@ -829,7 +830,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                        getSerializedMap(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                        // make some more modifications
                        backend.setCurrentKey(1);
@@ -841,7 +842,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.putAll(new HashMap<Integer, String>() {{ 
put(1031, "1031"); put(1032, "1032"); }});
 
                        // draw another snapshot
-                       KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+                       KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                        // validate the original state
                        backend.setCurrentKey(1);
@@ -1163,7 +1164,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                state.update("ShouldBeInSecondHalf");
 
 
-               KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 
0, streamFactory));
+               KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 
0, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
                List<KeyGroupsStateHandle> firstHalfKeyGroupStates = 
StateAssignmentOperation.getKeyGroupsStateHandles(
                                Collections.singletonList(snapshot),
@@ -1230,7 +1231,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.update("2");
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                        backend.dispose();
                        // restore the first snapshot and validate it
@@ -1281,7 +1282,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.add("2");
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                        backend.dispose();
                        // restore the first snapshot and validate it
@@ -1334,7 +1335,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.add("2");
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                        backend.dispose();
                        // restore the first snapshot and validate it
@@ -1385,7 +1386,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.put("2", "Second");
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                        backend.dispose();
                        // restore the first snapshot and validate it
@@ -1661,7 +1662,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                eq(env.getJobID()), eq(env.getJobVertexId()), 
eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
 
 
-               KeyGroupsStateHandle snapshot = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+               KeyGroupsStateHandle snapshot = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
                backend.dispose();
 
@@ -1692,7 +1693,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        ListStateDescriptor<String> kvId = new 
ListStateDescriptor<>("id", String.class);
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot = 
runSnapshot(backend.snapshot(682375462379L, 1, streamFactory));
+                       KeyGroupsStateHandle snapshot = 
runSnapshot(backend.snapshot(682375462379L, 1, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
                        assertNull(snapshot);
                        backend.dispose();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
new file mode 100644
index 0000000..a29d29c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class FsSavepointStreamFactoryTest {
+
+       @Rule
+       public TemporaryFolder folder = new TemporaryFolder();
+
+       /**
+        * Tests that the factory creates all files in the given directory 
without
+        * creating any sub directories.
+        */
+       @Test
+       public void testSavepointStreamDirectoryLayout() throws Exception {
+               File testRoot = folder.newFolder();
+               JobID jobId = new JobID();
+
+               FsSavepointStreamFactory savepointStreamFactory = new 
FsSavepointStreamFactory(
+                               new Path(testRoot.getAbsolutePath()),
+                               jobId,
+                               0);
+
+               File[] listed = testRoot.listFiles();
+               assertNotNull(listed);
+               assertEquals(0, listed.length);
+
+               FsCheckpointStateOutputStream stream = savepointStreamFactory
+                       .createCheckpointStateOutputStream(1273, 19231);
+
+               stream.write(1);
+
+               FileStateHandle handle = (FileStateHandle) 
stream.closeAndGetHandle();
+
+               listed = testRoot.listFiles();
+               assertNotNull(listed);
+               assertEquals(1, listed.length);
+               assertEquals(handle.getFilePath().getPath(), 
listed[0].getPath());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 187163d..89ae5da 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -91,7 +92,7 @@ public class TaskAsyncCallTest {
                        awaitLatch.await();
                        
                        for (int i = 1; i <= NUM_CALLS; i++) {
-                               task.triggerCheckpointBarrier(i, 156865867234L);
+                               task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forFullCheckpoint());
                        }
                        
                        triggerLatch.await();
@@ -121,7 +122,7 @@ public class TaskAsyncCallTest {
                        awaitLatch.await();
 
                        for (int i = 1; i <= NUM_CALLS; i++) {
-                               task.triggerCheckpointBarrier(i, 156865867234L);
+                               task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forFullCheckpoint());
                                task.notifyCheckpointComplete(i);
                        }
 
@@ -226,7 +227,7 @@ public class TaskAsyncCallTest {
                public void setInitialState(TaskStateHandles taskStateHandles) 
throws Exception {}
 
                @Override
-               public boolean triggerCheckpoint(CheckpointMetaData 
checkpointMetaData) {
+               public boolean triggerCheckpoint(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions) {
                        lastCheckpointId++;
                        if (checkpointMetaData.getCheckpointId() == 
lastCheckpointId) {
                                if (lastCheckpointId == NUM_CALLS) {
@@ -243,7 +244,7 @@ public class TaskAsyncCallTest {
                }
 
                @Override
-               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
                        throw new UnsupportedOperationException("Should not be 
called");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 144247f..05fda28 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import java.io.IOException;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.flink.annotation.PublicEvolving;
@@ -36,6 +37,8 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -340,17 +343,19 @@ public abstract class AbstractStreamOperator<OUT>
        }
 
        @Override
-       public final OperatorSnapshotResult snapshotState(long checkpointId, 
long timestamp) throws Exception {
+       public final OperatorSnapshotResult snapshotState(long checkpointId, 
long timestamp, CheckpointOptions checkpointOptions) throws Exception {
 
                KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                                keyedStateBackend.getKeyGroupRange() : 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
 
                OperatorSnapshotResult snapshotInProgress = new 
OperatorSnapshotResult();
 
+               CheckpointStreamFactory factory = 
getCheckpointStreamFactory(checkpointOptions);
+
                try (StateSnapshotContextSynchronousImpl snapshotContext = new 
StateSnapshotContextSynchronousImpl(
                                checkpointId,
                                timestamp,
-                               checkpointStreamFactory,
+                               factory,
                                keyGroupRange,
                                getContainingTask().getCancelables())) {
 
@@ -361,12 +366,12 @@ public abstract class AbstractStreamOperator<OUT>
 
                        if (null != operatorStateBackend) {
                                
snapshotInProgress.setOperatorStateManagedFuture(
-                                       
operatorStateBackend.snapshot(checkpointId, timestamp, 
checkpointStreamFactory));
+                                       
operatorStateBackend.snapshot(checkpointId, timestamp, factory, 
checkpointOptions));
                        }
 
                        if (null != keyedStateBackend) {
                                snapshotInProgress.setKeyedStateManagedFuture(
-                                       
keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory));
+                                       
keyedStateBackend.snapshot(checkpointId, timestamp, factory, 
checkpointOptions));
                        }
                } catch (Exception snapshotException) {
                        try {
@@ -431,11 +436,12 @@ public abstract class AbstractStreamOperator<OUT>
        @SuppressWarnings("deprecation")
        @Deprecated
        @Override
-       public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, 
long timestamp) throws Exception {
+       public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, 
long timestamp, CheckpointOptions checkpointOptions) throws Exception {
                if (this instanceof StreamCheckpointedOperator) {
+                       CheckpointStreamFactory factory = 
getCheckpointStreamFactory(checkpointOptions);
 
                        final 
CheckpointStreamFactory.CheckpointStateOutputStream outStream =
-                                       
checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, 
timestamp);
+                               
factory.createCheckpointStateOutputStream(checkpointId, timestamp);
 
                        
getContainingTask().getCancelables().registerClosable(outStream);
 
@@ -495,6 +501,31 @@ public abstract class AbstractStreamOperator<OUT>
        @Override
        public void notifyOfCompletedCheckpoint(long checkpointId) throws 
Exception {}
 
+       /**
+        * Returns a checkpoint stream factory for the provided options.
+        *
+        * <p>For {@link CheckpointType#FULL_CHECKPOINT} this returns the shared
+        * factory of this operator.
+        *
+        * <p>For {@link CheckpointType#SAVEPOINT} it creates a custom factory 
per
+        * savepoint.
+        *
+        * @param checkpointOptions Options for the checkpoint
+        * @return Checkpoint stream factory for the checkpoints
+        * @throws IOException Failures while creating a new stream factory are 
forwarded
+        */
+       @VisibleForTesting
+       CheckpointStreamFactory getCheckpointStreamFactory(CheckpointOptions 
checkpointOptions) throws IOException {
+               CheckpointType checkpointType = 
checkpointOptions.getCheckpointType();
+               if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+                       return checkpointStreamFactory;
+               } else if (checkpointType == CheckpointType.SAVEPOINT) {
+                       return container.createSavepointStreamFactory(this, 
checkpointOptions.getTargetLocation());
+               } else {
+                       throw new IllegalStateException("Unknown checkpoint 
type " + checkpointType);
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Properties and Services
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 5a6c37b..83697ae 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.ExceptionUtils;
 import java.util.concurrent.RunnableFuture;
 
 /**
- * Result of {@link AbstractStreamOperator#snapshotState}.
+ * Result of {@link StreamOperator#snapshotState}.
  */
 public class OperatorSnapshotResult {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index d8e4d08..006e910 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -98,7 +99,10 @@ public interface StreamOperator<OUT> extends Serializable {
         * 
         * @throws Exception exception that happened during snapshotting.
         */
-       OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) 
throws Exception;
+       OperatorSnapshotResult snapshotState(
+               long checkpointId,
+               long timestamp,
+               CheckpointOptions checkpointOptions) throws Exception;
 
        /**
         * Takes a snapshot of the legacy operator state defined via {@link 
StreamCheckpointedOperator}.
@@ -110,7 +114,10 @@ public interface StreamOperator<OUT> extends Serializable {
         */
        @SuppressWarnings("deprecation")
        @Deprecated
-       StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long 
timestamp) throws Exception;
+       StreamStateHandle snapshotLegacyOperatorState(
+               long checkpointId,
+               long timestamp,
+               CheckpointOptions checkpointOptions) throws Exception;
 
        /**
         * Provides state handles to restore the operator state.
@@ -142,4 +149,5 @@ public interface StreamOperator<OUT> extends Serializable {
        void setChainingStrategy(ChainingStrategy strategy);
 
        MetricGroup getMetricGroup();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 611bd44..2da8389 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -368,7 +368,10 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                        
.setBytesBufferedInAlignment(bytesBuffered)
                                        
.setAlignmentDurationNanos(latestAlignmentDurationNanos);
 
-                       
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, 
checkpointMetrics);
+                       toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+                               checkpointMetaData,
+                               checkpointBarrier.getCheckpointOptions(),
+                               checkpointMetrics);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 77608c6..8b1b65b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -132,7 +133,7 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
 
                // fast path for single channel trackers
                if (totalNumberOfInputChannels == 1) {
-                       notifyCheckpoint(barrierId, 
receivedBarrier.getTimestamp());
+                       notifyCheckpoint(barrierId, 
receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
                        return;
                }
 
@@ -170,7 +171,7 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                                                LOG.debug("Received all 
barriers for checkpoint {}", barrierId);
                                        }
 
-                                       
notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp());
+                                       
notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), 
receivedBarrier.getCheckpointOptions());
                                }
                        }
                }
@@ -248,14 +249,14 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                }
        }
 
-       private void notifyCheckpoint(long checkpointId, long timestamp) throws 
Exception {
+       private void notifyCheckpoint(long checkpointId, long timestamp, 
CheckpointOptions checkpointOptions) throws Exception {
                if (toNotifyOnCheckpoint != null) {
                        CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
                        CheckpointMetrics checkpointMetrics = new 
CheckpointMetrics()
                                .setBytesBufferedInAlignment(0L)
                                .setAlignmentDurationNanos(0L);
 
-                       
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, 
checkpointMetrics);
+                       
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, 
checkpointOptions, checkpointMetrics);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 4f07182..dd93592 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -164,9 +165,9 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                }
        }
 
-       public void broadcastCheckpointBarrier(long id, long timestamp) throws 
IOException {
+       public void broadcastCheckpointBarrier(long id, long timestamp, 
CheckpointOptions checkpointOptions) throws IOException {
                try {
-                       CheckpointBarrier barrier = new CheckpointBarrier(id, 
timestamp);
+                       CheckpointBarrier barrier = new CheckpointBarrier(id, 
timestamp, checkpointOptions);
                        for (RecordWriterOutput<?> streamOutput : 
streamOutputs) {
                                streamOutput.broadcastEvent(barrier);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 62cfb8f..938ffd2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -163,7 +164,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
        private TaskStateHandles restoreStateHandles;
 
-
        /** The currently active background materialization threads */
        private final CloseableRegistry cancelables = new CloseableRegistry();
 
@@ -520,14 +520,14 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        }
 
        @Override
-       public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) 
throws Exception {
+       public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, 
CheckpointOptions checkpointOptions) throws Exception {
                try {
                        // No alignment if we inject a checkpoint
                        CheckpointMetrics checkpointMetrics = new 
CheckpointMetrics()
                                        .setBytesBufferedInAlignment(0L)
                                        .setAlignmentDurationNanos(0L);
 
-                       return performCheckpoint(checkpointMetaData, 
checkpointMetrics);
+                       return performCheckpoint(checkpointMetaData, 
checkpointOptions, checkpointMetrics);
                }
                catch (Exception e) {
                        // propagate exceptions only if the task is still in 
"running" state
@@ -543,9 +543,13 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        }
 
        @Override
-       public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+       public void triggerCheckpointOnBarrier(
+                       CheckpointMetaData checkpointMetaData,
+                       CheckpointOptions checkpointOptions,
+                       CheckpointMetrics checkpointMetrics) throws Exception {
+
                try {
-                       performCheckpoint(checkpointMetaData, 
checkpointMetrics);
+                       performCheckpoint(checkpointMetaData, 
checkpointOptions, checkpointMetrics);
                }
                catch (CancelTaskException e) {
                        throw new Exception("Operator " + getName() + " was 
cancelled while performing checkpoint " +
@@ -570,8 +574,13 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                }
        }
 
-       private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
-               LOG.debug("Starting checkpoint {} on task {}", 
checkpointMetaData.getCheckpointId(), getName());
+       private boolean performCheckpoint(
+                       CheckpointMetaData checkpointMetaData,
+                       CheckpointOptions checkpointOptions,
+                       CheckpointMetrics checkpointMetrics) throws Exception {
+
+               LOG.debug("Starting checkpoint ({}) {} on task {}",
+                       checkpointMetaData.getCheckpointId(), 
checkpointOptions.getCheckpointType(), getName());
 
                synchronized (lock) {
                        if (isRunning) {
@@ -582,9 +591,11 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                // Given this, we immediately emit the 
checkpoint barriers, so the downstream operators
                                // can start their checkpoint work as soon as 
possible
                                operatorChain.broadcastCheckpointBarrier(
-                                               
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
+                                               
checkpointMetaData.getCheckpointId(),
+                                               
checkpointMetaData.getTimestamp(),
+                                               checkpointOptions);
 
-                               checkpointState(checkpointMetaData, 
checkpointMetrics);
+                               checkpointState(checkpointMetaData, 
checkpointOptions, checkpointMetrics);
                                return true;
                        }
                        else {
@@ -637,8 +648,17 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                }
        }
 
-       private void checkpointState(CheckpointMetaData checkpointMetaData, 
CheckpointMetrics checkpointMetrics) throws Exception {
-               CheckpointingOperation checkpointingOperation = new 
CheckpointingOperation(this, checkpointMetaData, checkpointMetrics);
+       private void checkpointState(
+                       CheckpointMetaData checkpointMetaData,
+                       CheckpointOptions checkpointOptions,
+                       CheckpointMetrics checkpointMetrics) throws Exception {
+
+               CheckpointingOperation checkpointingOperation = new 
CheckpointingOperation(
+                       this,
+                       checkpointMetaData,
+                       checkpointOptions,
+                       checkpointMetrics);
+
                checkpointingOperation.executeCheckpointing();
        }
 
@@ -814,7 +834,13 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                return stateBackend.createStreamFactory(
                                getEnvironment().getJobID(),
                                createOperatorIdentifier(operator, 
configuration.getVertexID()));
+       }
 
+       public CheckpointStreamFactory 
createSavepointStreamFactory(StreamOperator<?> operator, String targetLocation) 
throws IOException {
+               return stateBackend.createSavepointStreamFactory(
+                       getEnvironment().getJobID(),
+                       createOperatorIdentifier(operator, 
configuration.getVertexID()),
+                       targetLocation);
        }
 
        private String createOperatorIdentifier(StreamOperator<?> operator, int 
vertexId) {
@@ -1048,6 +1074,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                private final StreamTask<?, ?> owner;
 
                private final CheckpointMetaData checkpointMetaData;
+               private final CheckpointOptions checkpointOptions;
                private final CheckpointMetrics checkpointMetrics;
 
                private final StreamOperator<?>[] allOperators;
@@ -1060,9 +1087,15 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                private final List<StreamStateHandle> nonPartitionedStates;
                private final List<OperatorSnapshotResult> 
snapshotInProgressList;
 
-               public CheckpointingOperation(StreamTask<?, ?> owner, 
CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) {
+               public CheckpointingOperation(
+                               StreamTask<?, ?> owner,
+                               CheckpointMetaData checkpointMetaData,
+                               CheckpointOptions checkpointOptions,
+                               CheckpointMetrics checkpointMetrics) {
+
                        this.owner = Preconditions.checkNotNull(owner);
                        this.checkpointMetaData = 
Preconditions.checkNotNull(checkpointMetaData);
+                       this.checkpointOptions = 
Preconditions.checkNotNull(checkpointOptions);
                        this.checkpointMetrics = 
Preconditions.checkNotNull(checkpointMetrics);
                        this.allOperators = 
owner.operatorChain.getAllOperators();
                        this.nonPartitionedStates = new 
ArrayList<>(allOperators.length);
@@ -1137,14 +1170,16 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                @SuppressWarnings("deprecation")
                private void checkpointStreamOperator(StreamOperator<?> op) 
throws Exception {
                        if (null != op) {
-                               // first call the legacy checkpoint code paths 
+                               // first call the legacy checkpoint code paths
                                
nonPartitionedStates.add(op.snapshotLegacyOperatorState(
                                                
checkpointMetaData.getCheckpointId(),
-                                               
checkpointMetaData.getTimestamp()));
+                                               
checkpointMetaData.getTimestamp(),
+                                               checkpointOptions));
 
                                OperatorSnapshotResult snapshotInProgress = 
op.snapshotState(
                                                
checkpointMetaData.getCheckpointId(),
-                                               
checkpointMetaData.getTimestamp());
+                                               
checkpointMetaData.getTimestamp(),
+                                               checkpointOptions);
 
                                snapshotInProgressList.add(snapshotInProgress);
                        } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
index 6751617..51b9d9a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -100,4 +100,4 @@ public class ListCheckpointedTest {
                        return restored;
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 274611a..8507200 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -17,12 +17,34 @@
  */
 package org.apache.flink.streaming.api.operators;
 
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.RunnableFuture;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -45,27 +67,6 @@ import org.mockito.internal.util.reflection.Whitebox;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.RunnableFuture;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.spy;
-import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
 /**
  * Tests for the facilities provided by {@link AbstractStreamOperator}. This 
mostly
  * tests timers and state and whether they are correctly checkpointed/restored
@@ -495,10 +496,10 @@ public class AbstractStreamOperatorTest {
                
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
                AbstractStreamOperator<Void> operator = 
mock(AbstractStreamOperator.class);
-               when(operator.snapshotState(anyLong(), 
anyLong())).thenCallRealMethod();
+               when(operator.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenCallRealMethod();
                doReturn(containingTask).when(operator).getContainingTask();
 
-               operator.snapshotState(checkpointId, timestamp);
+               operator.snapshotState(checkpointId, timestamp, 
CheckpointOptions.forFullCheckpoint());
 
                verify(context).close();
        }
@@ -524,14 +525,14 @@ public class AbstractStreamOperatorTest {
                
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
                AbstractStreamOperator<Void> operator = 
mock(AbstractStreamOperator.class);
-               when(operator.snapshotState(anyLong(), 
anyLong())).thenCallRealMethod();
+               when(operator.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenCallRealMethod();
                doReturn(containingTask).when(operator).getContainingTask();
 
                // lets fail when calling the actual snapshotState method
                
doThrow(failingException).when(operator).snapshotState(eq(context));
 
                try {
-                       operator.snapshotState(checkpointId, timestamp);
+                       operator.snapshotState(checkpointId, timestamp, 
CheckpointOptions.forFullCheckpoint());
                        fail("Exception expected.");
                } catch (Exception e) {
                        assertEquals(failingException, e.getCause());
@@ -571,23 +572,29 @@ public class AbstractStreamOperatorTest {
                
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
                AbstractStreamOperator<Void> operator = 
mock(AbstractStreamOperator.class);
-               when(operator.snapshotState(anyLong(), 
anyLong())).thenCallRealMethod();
+               when(operator.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenCallRealMethod();
+
+               // The amount of mocking in this test makes it necessary to 
make the
+               // getCheckpointStreamFactory method visible for the test and to
+               // overwrite its behaviour.
+               
when(operator.getCheckpointStreamFactory(any(CheckpointOptions.class))).thenReturn(streamFactory);
+
                doReturn(containingTask).when(operator).getContainingTask();
 
                RunnableFuture<OperatorStateHandle> 
futureManagedOperatorStateHandle = mock(RunnableFuture.class);
 
                OperatorStateBackend operatorStateBackend = 
mock(OperatorStateBackend.class);
-               when(operatorStateBackend.snapshot(eq(checkpointId), 
eq(timestamp), eq(streamFactory))).thenReturn(futureManagedOperatorStateHandle);
+               when(operatorStateBackend.snapshot(eq(checkpointId), 
eq(timestamp), eq(streamFactory), 
any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle);
 
                AbstractKeyedStateBackend<?> keyedStateBackend = 
mock(AbstractKeyedStateBackend.class);
-               when(keyedStateBackend.snapshot(eq(checkpointId), 
eq(timestamp), eq(streamFactory))).thenThrow(failingException);
+               when(keyedStateBackend.snapshot(eq(checkpointId), 
eq(timestamp), eq(streamFactory), 
eq(CheckpointOptions.forFullCheckpoint()))).thenThrow(failingException);
 
                Whitebox.setInternalState(operator, "operatorStateBackend", 
operatorStateBackend);
                Whitebox.setInternalState(operator, "keyedStateBackend", 
keyedStateBackend);
                Whitebox.setInternalState(operator, "checkpointStreamFactory", 
streamFactory);
 
                try {
-                       operator.snapshotState(checkpointId, timestamp);
+                       operator.snapshotState(checkpointId, timestamp, 
CheckpointOptions.forFullCheckpoint());
                        fail("Exception expected.");
                } catch (Exception e) {
                        assertEquals(failingException, e.getCause());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index c4ddea8..d331171 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -90,8 +91,8 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                        "setup[class 
org.apache.flink.streaming.runtime.tasks.StreamTask, class " +
                        "org.apache.flink.streaming.api.graph.StreamConfig, 
interface " +
                        "org.apache.flink.streaming.api.operators.Output], " +
-                       "snapshotLegacyOperatorState[long, long], " +
-                       "snapshotState[long, long]]";
+                       "snapshotLegacyOperatorState[long, long, class 
org.apache.flink.runtime.checkpoint.CheckpointOptions], " +
+                       "snapshotState[long, long, class 
org.apache.flink.runtime.checkpoint.CheckpointOptions]]";
 
        private static final String ALL_METHODS_RICH_FUNCTION = "[close[], 
getIterationRuntimeContext[], getRuntimeContext[]" +
                        ", open[class 
org.apache.flink.configuration.Configuration], setRuntimeContext[interface " +
@@ -240,7 +241,8 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                                                try {
                                                        runStarted.await();
                                                        if 
(getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
-                                                                       new 
CheckpointMetaData(0, System.currentTimeMillis()))) {
+                                                                       new 
CheckpointMetaData(0, System.currentTimeMillis()),
+                                                                       
CheckpointOptions.forFullCheckpoint())) {
                                                                
LifecycleTrackingStreamSource.runFinish.trigger();
                                                        }
                                                } catch (Exception e) {
@@ -260,9 +262,9 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                }
 
                @Override
-               public StreamStateHandle snapshotLegacyOperatorState(long 
checkpointId, long timestamp) throws Exception {
+               public StreamStateHandle snapshotLegacyOperatorState(long 
checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws 
Exception {
                        
ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotLegacyOperatorState");
-                       return super.snapshotLegacyOperatorState(checkpointId, 
timestamp);
+                       return super.snapshotLegacyOperatorState(checkpointId, 
timestamp, checkpointOptions);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
index b1689f9..ab4258f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
@@ -184,4 +184,4 @@ public class WrappingFunctionSnapshotRestoreTest {
                        return value;
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 907f8f1..c4867ff 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -522,7 +523,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
                final CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, checkpointTimestamp);
 
-               task.triggerCheckpoint(checkpointMetaData);
+               task.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forFullCheckpoint());
 
                env.getCheckpointLatch().await();
 
@@ -557,7 +558,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
                restoredTaskHarness.processElement(new StreamRecord<>(7, 
initialTime + 7));
 
                // trigger the checkpoint while processing stream elements
-               restoredTask.triggerCheckpoint(new 
CheckpointMetaData(checkpointId, checkpointTimestamp));
+               restoredTask.triggerCheckpoint(new 
CheckpointMetaData(checkpointId, checkpointTimestamp), 
CheckpointOptions.forFullCheckpoint());
 
                restoredTaskHarness.processElement(new StreamRecord<>(8, 
initialTime + 8));
 

Reply via email to