http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 46f228a..e407443 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -155,7 +156,9 @@ public class BarrierBufferAlignmentLimitTest {
                check(sequence[21], buffer.getNextNonBlocked());
 
                // no call for a completed checkpoint must have happened
-               verify(toNotify, 
times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class),
+               verify(toNotify, times(0)).triggerCheckpointOnBarrier(
+                       any(CheckpointMetaData.class),
+                       any(CheckpointOptions.class),
                        any(CheckpointMetrics.class));
 
                assertNull(buffer.getNextNonBlocked());
@@ -242,7 +245,8 @@ public class BarrierBufferAlignmentLimitTest {
                // checkpoint 4 completed - check and validate buffered replay
                check(sequence[9], buffer.getNextNonBlocked());
                validateAlignmentTime(startTs, buffer);
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L)), 
any(CheckpointMetrics.class));
+               verify(toNotify, times(1)).triggerCheckpointOnBarrier(
+                       argThat(new CheckpointMatcher(4L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
 
                check(sequence[10], buffer.getNextNonBlocked());
                check(sequence[15], buffer.getNextNonBlocked());
@@ -254,7 +258,8 @@ public class BarrierBufferAlignmentLimitTest {
                check(sequence[21], buffer.getNextNonBlocked());
 
                // only checkpoint 4 was successfully completed, not checkpoint 
3
-               verify(toNotify, 
times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), 
any(CheckpointMetrics.class));
+               verify(toNotify, times(0)).triggerCheckpointOnBarrier(
+                       argThat(new CheckpointMatcher(3L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
 
                assertNull(buffer.getNextNonBlocked());
                assertNull(buffer.getNextNonBlocked());
@@ -284,7 +289,7 @@ public class BarrierBufferAlignmentLimitTest {
        }
 
        private static BufferOrEvent createBarrier(long id, int channel) {
-               return new BufferOrEvent(new CheckpointBarrier(id, 
System.currentTimeMillis()), channel);
+               return new BufferOrEvent(new CheckpointBarrier(id, 
System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
        }
 
        private static void check(BufferOrEvent expected, BufferOrEvent 
present) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 0cf866a..6e088f6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -151,7 +152,7 @@ public class BarrierBufferMassiveRandomTest {
 
                        if (barrierGens[currentChannel].isNextBarrier()) {
                                return new BufferOrEvent(
-                                               new 
CheckpointBarrier(++currentBarriers[currentChannel], 
System.currentTimeMillis()),
+                                               new 
CheckpointBarrier(++currentBarriers[currentChannel], 
System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()),
                                                        currentChannel);
                        } else {
                                Buffer buffer = 
bufferPools[currentChannel].requestBuffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 869d1fe..d6056d2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -566,7 +567,7 @@ public class BarrierBufferTest {
                        // checkpoint done - replay buffered
                        check(sequence[5], buffer.getNextNonBlocked());
                        validateAlignmentTime(startTs, buffer);
-                       verify(toNotify).triggerCheckpointOnBarrier(argThat(new 
CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+                       verify(toNotify).triggerCheckpointOnBarrier(argThat(new 
CheckpointMatcher(1L)), any(CheckpointOptions.class), 
any(CheckpointMetrics.class));
                        check(sequence[6], buffer.getNextNonBlocked());
 
                        check(sequence[9], buffer.getNextNonBlocked());
@@ -1008,14 +1009,14 @@ public class BarrierBufferTest {
 
                check(sequence[0], buffer.getNextNonBlocked());
                check(sequence[2], buffer.getNextNonBlocked());
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), 
any(CheckpointMetrics.class));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                check(sequence[6], buffer.getNextNonBlocked());
                assertEquals(5L, buffer.getCurrentCheckpointId());
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), 
any(CheckpointMetrics.class));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
                verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), 
any(CheckpointMetrics.class));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                check(sequence[8], buffer.getNextNonBlocked());
@@ -1078,7 +1079,7 @@ public class BarrierBufferTest {
                check(sequence[2], buffer.getNextNonBlocked());
                startTs = System.nanoTime();
                check(sequence[5], buffer.getNextNonBlocked());
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), 
any(CheckpointMetrics.class));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
                validateAlignmentTime(startTs, buffer);
 
                check(sequence[6], buffer.getNextNonBlocked());
@@ -1097,7 +1098,7 @@ public class BarrierBufferTest {
                check(sequence[16], buffer.getNextNonBlocked());
                startTs = System.nanoTime();
                check(sequence[20], buffer.getNextNonBlocked());
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), 
any(CheckpointMetrics.class));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
                validateAlignmentTime(startTs, buffer);
                check(sequence[21], buffer.getNextNonBlocked());
 
@@ -1114,7 +1115,7 @@ public class BarrierBufferTest {
                // a simple successful checkpoint
                startTs = System.nanoTime();
                check(sequence[32], buffer.getNextNonBlocked());
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), 
any(CheckpointMetrics.class));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
                validateAlignmentTime(startTs, buffer);
                check(sequence[33], buffer.getNextNonBlocked());
 
@@ -1175,7 +1176,7 @@ public class BarrierBufferTest {
 
                // finished first checkpoint
                check(sequence[3], buffer.getNextNonBlocked());
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), 
any(CheckpointMetrics.class));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
                validateAlignmentTime(startTs, buffer);
 
                check(sequence[5], buffer.getNextNonBlocked());
@@ -1198,7 +1199,7 @@ public class BarrierBufferTest {
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                // no further checkpoint (abort) notifications
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), 
any(CheckpointMetrics.class));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
                verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), 
any(CheckpointDeclineOnCancellationBarrierException.class));
 
                // all done
@@ -1280,7 +1281,7 @@ public class BarrierBufferTest {
                // checkpoint done
                check(sequence[7], buffer.getNextNonBlocked());
                validateAlignmentTime(startTs, buffer);
-               verify(toNotify).triggerCheckpointOnBarrier(argThat(new 
CheckpointMatcher(2L)), any(CheckpointMetrics.class));
+               verify(toNotify).triggerCheckpointOnBarrier(argThat(new 
CheckpointMatcher(2L)), any(CheckpointOptions.class), 
any(CheckpointMetrics.class));
 
                // queued data
                check(sequence[10], buffer.getNextNonBlocked());
@@ -1299,7 +1300,7 @@ public class BarrierBufferTest {
                checkNoTempFilesRemain();
 
                // check overall notifications
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), 
any(CheckpointMetrics.class));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
                verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), 
any(Throwable.class));
        }
 
@@ -1364,7 +1365,7 @@ public class BarrierBufferTest {
                // checkpoint finished
                check(sequence[7], buffer.getNextNonBlocked());
                validateAlignmentTime(startTs, buffer);
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), 
any(CheckpointMetrics.class));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
                check(sequence[11], buffer.getNextNonBlocked());
 
                // remaining data
@@ -1380,7 +1381,7 @@ public class BarrierBufferTest {
                checkNoTempFilesRemain();
 
                // check overall notifications
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), 
any(CheckpointMetrics.class));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
                verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), 
any(Throwable.class));
        }
 
@@ -1389,7 +1390,7 @@ public class BarrierBufferTest {
        // 
------------------------------------------------------------------------
 
        private static BufferOrEvent createBarrier(long checkpointId, int 
channel) {
-               return new BufferOrEvent(new CheckpointBarrier(checkpointId, 
System.currentTimeMillis()), channel);
+               return new BufferOrEvent(new CheckpointBarrier(checkpointId, 
System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
        }
 
        private static BufferOrEvent createCancellationBarrier(long 
checkpointId, int channel) {
@@ -1487,12 +1488,12 @@ public class BarrierBufferTest {
                }
 
                @Override
-               public boolean triggerCheckpoint(CheckpointMetaData 
checkpointMetaData) throws Exception {
+               public boolean triggerCheckpoint(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
                        throw new UnsupportedOperationException("should never 
be called");
                }
 
                @Override
-               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
                        assertTrue("wrong checkpoint id",
                                        nextExpectedCheckpointId == -1L || 
                                        nextExpectedCheckpointId == 
checkpointMetaData.getCheckpointId());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index da322f6..05f7da6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -470,7 +471,7 @@ public class BarrierTrackerTest {
        // 
------------------------------------------------------------------------
 
        private static BufferOrEvent createBarrier(long id, int channel) {
-               return new BufferOrEvent(new CheckpointBarrier(id, 
System.currentTimeMillis()), channel);
+               return new BufferOrEvent(new CheckpointBarrier(id, 
System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
        }
 
        private static BufferOrEvent createCancellationBarrier(long id, int 
channel) {
@@ -502,12 +503,12 @@ public class BarrierTrackerTest {
                }
 
                @Override
-               public boolean triggerCheckpoint(CheckpointMetaData 
checkpointMetaData) throws Exception {
+               public boolean triggerCheckpoint(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
                        throw new UnsupportedOperationException("should never 
be called");
                }
 
                @Override
-               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
                        assertTrue("More checkpoints than expected", i < 
checkpointIDs.length);
 
                        final long expectedId = checkpointIDs[i++];

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 5c0f0cf..51294ce 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -174,6 +175,11 @@ public class BlockingCheckpointsTest {
                }
 
                @Override
+               public CheckpointStreamFactory 
createSavepointStreamFactory(JobID jobId, String operatorIdentifier, String 
targetLocation) throws IOException {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
                public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
                                Environment env, JobID jobID, String 
operatorIdentifier,
                                TypeSerializer<K> keySerializer, int 
numberOfKeyGroups,
@@ -276,7 +282,7 @@ public class BlockingCheckpointsTest {
 
                @Override
                protected void run() throws Exception {
-                       triggerCheckpointOnBarrier(new CheckpointMetaData(11L, 
System.currentTimeMillis()), new CheckpointMetrics());
+                       triggerCheckpointOnBarrier(new CheckpointMetaData(11L, 
System.currentTimeMillis()), CheckpointOptions.forFullCheckpoint(), new 
CheckpointMetrics());
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 69c2c88..e22bf86 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -408,7 +409,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                testHarness.invoke();
                testHarness.waitForTaskRunning();
 
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 0, 0);
 
                // These elements should be buffered until we receive barriers 
from
                // all inputs
@@ -427,14 +428,14 @@ public class OneInputStreamTaskTest extends TestLogger {
                // we should not yet see the barrier, only the two elements 
from non-blocked input
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
 
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 0, 1);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 1, 0);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 1, 1);
 
                testHarness.waitForInputProcessing();
 
                // now we should see the barrier and after that the buffered 
elements
-               expectedOutput.add(new CheckpointBarrier(0, 0));
+               expectedOutput.add(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()));
                expectedOutput.add(new StreamRecord<String>("Hello-0-0", 
initialTime));
                expectedOutput.add(new StreamRecord<String>("Ciao-0-0", 
initialTime));
 
@@ -467,7 +468,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                testHarness.invoke();
                testHarness.waitForTaskRunning();
 
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 0, 0);
 
                // These elements should be buffered until we receive barriers 
from
                // all inputs
@@ -488,15 +489,15 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                // Now give a later barrier to all inputs, this should unblock 
the first channel,
                // thereby allowing the two blocked elements through
-               testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
-               testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
-               testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
-               testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+               testHarness.processEvent(new CheckpointBarrier(1, 1, 
CheckpointOptions.forFullCheckpoint()), 0, 0);
+               testHarness.processEvent(new CheckpointBarrier(1, 1, 
CheckpointOptions.forFullCheckpoint()), 0, 1);
+               testHarness.processEvent(new CheckpointBarrier(1, 1, 
CheckpointOptions.forFullCheckpoint()), 1, 0);
+               testHarness.processEvent(new CheckpointBarrier(1, 1, 
CheckpointOptions.forFullCheckpoint()), 1, 1);
 
                expectedOutput.add(new CancelCheckpointMarker(0));
                expectedOutput.add(new StreamRecord<String>("Hello-0-0", 
initialTime));
                expectedOutput.add(new StreamRecord<String>("Ciao-0-0", 
initialTime));
-               expectedOutput.add(new CheckpointBarrier(1, 1));
+               expectedOutput.add(new CheckpointBarrier(1, 1, 
CheckpointOptions.forFullCheckpoint()));
 
                testHarness.waitForInputProcessing();
 
@@ -504,9 +505,9 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 
                // Then give the earlier barrier, these should be ignored
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 0, 1);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 1, 0);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 1, 1);
 
                testHarness.waitForInputProcessing();
 
@@ -557,7 +558,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, checkpointTimestamp);
 
-               while(!streamTask.triggerCheckpoint(checkpointMetaData));
+               while(!streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forFullCheckpoint()));
 
                // since no state was set, there shouldn't be restore calls
                assertEquals(0, TestingStreamOperator.numberRestoreCalls);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 0773699..1a6fa8f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -231,7 +232,7 @@ public class SourceStreamTaskTest {
                        for (int i = 0; i < numCheckpoints; i++) {
                                long currentCheckpointId = 
checkpointId.getAndIncrement();
                                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(currentCheckpointId, 0L);
-                               
sourceTask.triggerCheckpoint(checkpointMetaData);
+                               
sourceTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forFullCheckpoint());
                                Thread.sleep(checkpointInterval);
                        }
                        return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index c2d4aaa..53f77ca 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -56,7 +57,8 @@ public class StreamTaskCancellationBarrierTest {
                testHarness.invoke();
 
                // tell the task to commence a checkpoint
-               boolean result = task.triggerCheckpoint(new 
CheckpointMetaData(41L, System.currentTimeMillis()));
+               boolean result = task.triggerCheckpoint(new 
CheckpointMetaData(41L, System.currentTimeMillis()),
+                       CheckpointOptions.forFullCheckpoint());
                assertFalse("task triggered checkpoint though not ready", 
result);
 
                // a cancellation barrier should be downstream

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 1e74c3e..3d01fdd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -305,18 +306,18 @@ public class StreamTaskTest extends TestLogger {
 
                final Exception testException = new Exception("Test exception");
 
-               when(streamOperator1.snapshotState(anyLong(), 
anyLong())).thenReturn(operatorSnapshotResult1);
-               when(streamOperator2.snapshotState(anyLong(), 
anyLong())).thenReturn(operatorSnapshotResult2);
-               when(streamOperator3.snapshotState(anyLong(), 
anyLong())).thenThrow(testException);
+               when(streamOperator1.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult1);
+               when(streamOperator2.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
+               when(streamOperator3.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenThrow(testException);
 
                // mock the returned legacy snapshots
                StreamStateHandle streamStateHandle1 = 
mock(StreamStateHandle.class);
                StreamStateHandle streamStateHandle2 = 
mock(StreamStateHandle.class);
                StreamStateHandle streamStateHandle3 = 
mock(StreamStateHandle.class);
 
-               when(streamOperator1.snapshotLegacyOperatorState(anyLong(), 
anyLong())).thenReturn(streamStateHandle1);
-               when(streamOperator2.snapshotLegacyOperatorState(anyLong(), 
anyLong())).thenReturn(streamStateHandle2);
-               when(streamOperator3.snapshotLegacyOperatorState(anyLong(), 
anyLong())).thenReturn(streamStateHandle3);
+               when(streamOperator1.snapshotLegacyOperatorState(anyLong(), 
anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1);
+               when(streamOperator2.snapshotLegacyOperatorState(anyLong(), 
anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2);
+               when(streamOperator3.snapshotLegacyOperatorState(anyLong(), 
anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3);
 
                // set up the task
 
@@ -332,7 +333,7 @@ public class StreamTaskTest extends TestLogger {
                Whitebox.setInternalState(streamTask, "configuration", new 
StreamConfig(new Configuration()));
 
                try {
-                       streamTask.triggerCheckpoint(checkpointMetaData);
+                       streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forFullCheckpoint());
                        fail("Expected test exception here.");
                } catch (Exception e) {
                        assertEquals(testException, e.getCause());
@@ -380,18 +381,18 @@ public class StreamTaskTest extends TestLogger {
 
                
when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
 
-               when(streamOperator1.snapshotState(anyLong(), 
anyLong())).thenReturn(operatorSnapshotResult1);
-               when(streamOperator2.snapshotState(anyLong(), 
anyLong())).thenReturn(operatorSnapshotResult2);
-               when(streamOperator3.snapshotState(anyLong(), 
anyLong())).thenReturn(operatorSnapshotResult3);
+               when(streamOperator1.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult1);
+               when(streamOperator2.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
+               when(streamOperator3.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult3);
 
                // mock the legacy state snapshot
                StreamStateHandle streamStateHandle1 = 
mock(StreamStateHandle.class);
                StreamStateHandle streamStateHandle2 = 
mock(StreamStateHandle.class);
                StreamStateHandle streamStateHandle3 = 
mock(StreamStateHandle.class);
 
-               when(streamOperator1.snapshotLegacyOperatorState(anyLong(), 
anyLong())).thenReturn(streamStateHandle1);
-               when(streamOperator2.snapshotLegacyOperatorState(anyLong(), 
anyLong())).thenReturn(streamStateHandle2);
-               when(streamOperator3.snapshotLegacyOperatorState(anyLong(), 
anyLong())).thenReturn(streamStateHandle3);
+               when(streamOperator1.snapshotLegacyOperatorState(anyLong(), 
anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1);
+               when(streamOperator2.snapshotLegacyOperatorState(anyLong(), 
anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2);
+               when(streamOperator3.snapshotLegacyOperatorState(anyLong(), 
anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3);
 
                StreamOperator<?>[] streamOperators = {streamOperator1, 
streamOperator2, streamOperator3};
 
@@ -405,7 +406,7 @@ public class StreamTaskTest extends TestLogger {
                Whitebox.setInternalState(streamTask, 
"asyncOperationsThreadPool", new DirectExecutorService());
                Whitebox.setInternalState(streamTask, "configuration", new 
StreamConfig(new Configuration()));
 
-               streamTask.triggerCheckpoint(checkpointMetaData);
+               streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forFullCheckpoint());
 
                verify(streamTask).handleAsyncException(anyString(), 
any(Throwable.class));
 
@@ -468,7 +469,7 @@ public class StreamTaskTest extends TestLogger {
                        new DoneFuture<>(managedOperatorStateHandle),
                        new DoneFuture<>(rawOperatorStateHandle));
 
-               when(streamOperator.snapshotState(anyLong(), 
anyLong())).thenReturn(operatorSnapshotResult);
+               when(streamOperator.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult);
 
                StreamOperator<?>[] streamOperators = {streamOperator};
 
@@ -495,7 +496,7 @@ public class StreamTaskTest extends TestLogger {
                Whitebox.setInternalState(streamTask, "configuration", new 
StreamConfig(new Configuration()));
                Whitebox.setInternalState(streamTask, "stateBackend", 
mockStateBackend);
 
-               streamTask.triggerCheckpoint(checkpointMetaData);
+               streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forFullCheckpoint());
 
                acknowledgeCheckpointLatch.await();
 
@@ -584,7 +585,7 @@ public class StreamTaskTest extends TestLogger {
                        new DoneFuture<>(managedOperatorStateHandle),
                        new DoneFuture<>(rawOperatorStateHandle));
 
-               when(streamOperator.snapshotState(anyLong(), 
anyLong())).thenReturn(operatorSnapshotResult);
+               when(streamOperator.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult);
 
                StreamOperator<?>[] streamOperators = {streamOperator};
 
@@ -613,7 +614,7 @@ public class StreamTaskTest extends TestLogger {
                Whitebox.setInternalState(streamTask, "configuration", new 
StreamConfig(new Configuration()));
                Whitebox.setInternalState(streamTask, "stateBackend", 
mockStateBackend);
 
-               streamTask.triggerCheckpoint(checkpointMetaData);
+               streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forFullCheckpoint());
 
                createSubtask.await();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index c0a1638..d465619 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -225,7 +226,7 @@ public class TwoInputStreamTaskTest {
                testHarness.invoke();
                testHarness.waitForTaskRunning();
 
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 0, 0);
 
                // This element should be buffered since we received a 
checkpoint barrier on
                // this input
@@ -262,16 +263,16 @@ public class TwoInputStreamTaskTest {
                        expectedOutput,
                        testHarness.getOutput());
 
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 0, 1);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 1, 0);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 1, 1);
 
                testHarness.waitForInputProcessing();
                testHarness.endInput();
                testHarness.waitForTaskCompletion();
 
                // now we should see the barrier and after that the buffered 
elements
-               expectedOutput.add(new CheckpointBarrier(0, 0));
+               expectedOutput.add(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()));
                expectedOutput.add(new StreamRecord<String>("Hello-0-0", 
initialTime));
 
                TestHarnessUtil.assertOutputEquals("Output was not correct.",
@@ -306,7 +307,7 @@ public class TwoInputStreamTaskTest {
                testHarness.invoke();
                testHarness.waitForTaskRunning();
 
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 0, 0);
 
                // These elements should be buffered until we receive barriers 
from
                // all inputs
@@ -329,15 +330,15 @@ public class TwoInputStreamTaskTest {
 
                // Now give a later barrier to all inputs, this should unblock 
the first channel,
                // thereby allowing the two blocked elements through
-               testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
-               testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
-               testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
-               testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+               testHarness.processEvent(new CheckpointBarrier(1, 1, 
CheckpointOptions.forFullCheckpoint()), 0, 0);
+               testHarness.processEvent(new CheckpointBarrier(1, 1, 
CheckpointOptions.forFullCheckpoint()), 0, 1);
+               testHarness.processEvent(new CheckpointBarrier(1, 1, 
CheckpointOptions.forFullCheckpoint()), 1, 0);
+               testHarness.processEvent(new CheckpointBarrier(1, 1, 
CheckpointOptions.forFullCheckpoint()), 1, 1);
 
                expectedOutput.add(new CancelCheckpointMarker(0));
                expectedOutput.add(new StreamRecord<String>("Hello-0-0", 
initialTime));
                expectedOutput.add(new StreamRecord<String>("Ciao-0-0", 
initialTime));
-               expectedOutput.add(new CheckpointBarrier(1, 1));
+               expectedOutput.add(new CheckpointBarrier(1, 1, 
CheckpointOptions.forFullCheckpoint()));
 
                testHarness.waitForInputProcessing();
 
@@ -347,9 +348,9 @@ public class TwoInputStreamTaskTest {
 
 
                // Then give the earlier barrier, these should be ignored
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-               testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 0, 1);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 1, 0);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 1, 1);
 
                testHarness.waitForInputProcessing();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 01afec6..07424f7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import 
org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
 import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.migration.util.MigrationInstantiationUtil;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
 import 
org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
@@ -478,11 +479,16 @@ public class AbstractStreamOperatorTestHarness<OUT> {
        }
 
        /**
-        * Calls {@link StreamOperator#snapshotState(long, long)}.
+        * Calls {@link StreamOperator#snapshotState(long, long, 
CheckpointOptions)}.
         */
        public OperatorStateHandles snapshot(long checkpointId, long timestamp) 
throws Exception {
 
-               OperatorSnapshotResult operatorStateResult = 
operator.snapshotState(checkpointId, timestamp);
+               CheckpointStreamFactory streamFactory = 
stateBackend.createStreamFactory(new JobID(), "test_op");
+
+               OperatorSnapshotResult operatorStateResult = 
operator.snapshotState(
+                       checkpointId,
+                       timestamp,
+                       CheckpointOptions.forFullCheckpoint());
 
                KeyGroupsStateHandle keyedManaged = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
                KeyGroupsStateHandle keyedRaw = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index cde5780..effb44c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -143,9 +144,11 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
                }
 
                if (keyedStateBackend != null) {
-                       RunnableFuture<KeyGroupsStateHandle> 
keyedSnapshotRunnable = keyedStateBackend.snapshot(checkpointId,
+                       RunnableFuture<KeyGroupsStateHandle> 
keyedSnapshotRunnable = keyedStateBackend.snapshot(
+                                       checkpointId,
                                        timestamp,
-                                       streamFactory);
+                                       streamFactory,
+                                       CheckpointOptions.forFullCheckpoint());
                        if(!keyedSnapshotRunnable.isDone()) {
                                Thread runner = new 
Thread(keyedSnapshotRunnable);
                                runner.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 128522b..ac37009 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import java.io.FileNotFoundException;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -160,21 +161,21 @@ public class SavepointITCase extends TestLogger {
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTaskManager);
 
                        final File checkpointDir = new File(tmpDir, 
"checkpoints");
-                       final File savepointDir = new File(tmpDir, 
"savepoints");
+                       final File savepointRootDir = new File(tmpDir, 
"savepoints");
 
-                       if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) {
+                       if (!checkpointDir.mkdir() || 
!savepointRootDir.mkdirs()) {
                                fail("Test setup failed: failed to create 
temporary directories.");
                        }
 
                        LOG.info("Created temporary checkpoint directory: " + 
checkpointDir + ".");
-                       LOG.info("Created temporary savepoint directory: " + 
savepointDir + ".");
+                       LOG.info("Created temporary savepoint directory: " + 
savepointRootDir + ".");
 
                        config.setString(CoreOptions.STATE_BACKEND, 
"filesystem");
                        
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
                                checkpointDir.toURI().toString());
                        
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
                        
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
-                               savepointDir.toURI().toString());
+                               savepointRootDir.toURI().toString());
 
                        LOG.info("Flink configuration: " + config + ".");
 
@@ -217,14 +218,6 @@ public class SavepointITCase extends TestLogger {
                                .result(savepointPathFuture, 
deadline.timeLeft())).savepointPath();
                        LOG.info("Retrieved savepoint path: " + savepointPath + 
".");
 
-                       // Only one savepoint should exist
-                       File[] files = savepointDir.listFiles();
-                       if (files != null) {
-                               assertEquals("Savepoint not created in expected 
directory", 1, files.length);
-                       } else {
-                               fail("Savepoint not created in expected 
directory");
-                       }
-
                        // Retrieve the savepoint from the testing job manager
                        LOG.info("Requesting the savepoint.");
                        Future<Object> savepointFuture = jobManager.ask(new 
RequestSavepoint(savepointPath), deadline.timeLeft());
@@ -240,15 +233,33 @@ public class SavepointITCase extends TestLogger {
 
                        // - Verification START 
-------------------------------------------
 
+                       // Only one savepoint should exist
+                       File[] files = savepointRootDir.listFiles();
+
+                       if (files != null) {
+                               assertEquals("Savepoint not created in expected 
directory", 1, files.length);
+                               assertTrue("Savepoint did not create 
self-contained directory", files[0].isDirectory());
+
+                               File savepointDir = files[0];
+                               File[] savepointFiles = 
savepointDir.listFiles();
+                               assertNotNull(savepointFiles);
+                               assertTrue("Did not write savepoint files to 
directory",savepointFiles.length > 1);
+                       } else {
+                               fail("Savepoint not created in expected 
directory");
+                       }
+
                        // Only one checkpoint of the savepoint should exist
                        // We currently have the following directory layout: 
checkpointDir/jobId/chk-ID
-                       files = checkpointDir.listFiles();
-                       assertNotNull("Checkpoint directory empty", files);
-                       assertEquals("Checkpoints directory cleaned up, but 
needed for savepoint.", 1, files.length);
-                       assertEquals("No job-specific base directory", 
jobGraph.getJobID().toString(), files[0].getName());
+                       File jobCheckpoints = new File(checkpointDir, 
jobId.toString());
+
+                       if (jobCheckpoints.exists()) {
+                               files = jobCheckpoints.listFiles();
+                               assertNotNull("Checkpoint directory empty", 
files);
+                               assertEquals("Checkpoints directory not cleaned 
up: " + Arrays.toString(files), 0, files.length);
+                       }
 
                        // Only one savepoint should exist
-                       files = savepointDir.listFiles();
+                       files = savepointRootDir.listFiles();
                        assertNotNull("Savepoint directory empty", files);
                        assertEquals("No savepoint found in savepoint 
directory", 1, files.length);
 
@@ -399,8 +410,8 @@ public class SavepointITCase extends TestLogger {
 
                        // All savepoints should have been cleaned up
                        errMsg = "Savepoints directory not cleaned up properly: 
" +
-                               Arrays.toString(savepointDir.listFiles()) + ".";
-                       assertEquals(errMsg, 0, 
savepointDir.listFiles().length);
+                               Arrays.toString(savepointRootDir.listFiles()) + 
".";
+                       assertEquals(errMsg, 0, 
savepointRootDir.listFiles().length);
 
                        // - Verification END 
---------------------------------------------
                } finally {
@@ -468,7 +479,7 @@ public class SavepointITCase extends TestLogger {
                                flink.submitJobAndWait(jobGraph, false);
                        } catch (Exception e) {
                                assertEquals(JobExecutionException.class, 
e.getClass());
-                               assertEquals(IllegalArgumentException.class, 
e.getCause().getClass());
+                               assertEquals(FileNotFoundException.class, 
e.getCause().getClass());
                        }
                } finally {
                        if (flink != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index ec6a8f5..79665dd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -86,7 +86,6 @@ public class StateBackendITCase extends 
StreamingMultipleProgramsTestBase {
                }
        }
 
-
        public static class FailingStateBackend extends AbstractStateBackend {
                private static final long serialVersionUID = 1L;
 
@@ -97,6 +96,12 @@ public class StateBackendITCase extends 
StreamingMultipleProgramsTestBase {
                }
 
                @Override
+               public CheckpointStreamFactory 
createSavepointStreamFactory(JobID jobId,
+                       String operatorIdentifier, String targetLocation) 
throws IOException {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
                public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
                                Environment env,
                                JobID jobID,

Reply via email to