http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java index 46f228a..e407443 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -155,7 +156,9 @@ public class BarrierBufferAlignmentLimitTest { check(sequence[21], buffer.getNextNonBlocked()); // no call for a completed checkpoint must have happened - verify(toNotify, times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), + verify(toNotify, times(0)).triggerCheckpointOnBarrier( + any(CheckpointMetaData.class), + any(CheckpointOptions.class), any(CheckpointMetrics.class)); assertNull(buffer.getNextNonBlocked()); @@ -242,7 +245,8 @@ public class BarrierBufferAlignmentLimitTest { // checkpoint 4 completed - check and validate buffered replay check(sequence[9], buffer.getNextNonBlocked()); validateAlignmentTime(startTs, buffer); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L)), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier( + argThat(new CheckpointMatcher(4L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); check(sequence[10], buffer.getNextNonBlocked()); check(sequence[15], buffer.getNextNonBlocked()); @@ -254,7 +258,8 @@ public class BarrierBufferAlignmentLimitTest { check(sequence[21], buffer.getNextNonBlocked()); // only checkpoint 4 was successfully completed, not checkpoint 3 - verify(toNotify, times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class)); + verify(toNotify, times(0)).triggerCheckpointOnBarrier( + argThat(new CheckpointMatcher(3L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); assertNull(buffer.getNextNonBlocked()); assertNull(buffer.getNextNonBlocked()); @@ -284,7 +289,7 @@ public class BarrierBufferAlignmentLimitTest { } private static BufferOrEvent createBarrier(long id, int channel) { - return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel); + return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel); } private static void check(BufferOrEvent expected, BufferOrEvent present) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java index 0cf866a..6e088f6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -151,7 +152,7 @@ public class BarrierBufferMassiveRandomTest { if (barrierGens[currentChannel].isNextBarrier()) { return new BufferOrEvent( - new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis()), + new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), currentChannel); } else { Buffer buffer = bufferPools[currentChannel].requestBuffer(); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java index 869d1fe..d6056d2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java @@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -566,7 +567,7 @@ public class BarrierBufferTest { // checkpoint done - replay buffered check(sequence[5], buffer.getNextNonBlocked()); validateAlignmentTime(startTs, buffer); - verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class)); + verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); check(sequence[6], buffer.getNextNonBlocked()); check(sequence[9], buffer.getNextNonBlocked()); @@ -1008,14 +1009,14 @@ public class BarrierBufferTest { check(sequence[0], buffer.getNextNonBlocked()); check(sequence[2], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); check(sequence[6], buffer.getNextNonBlocked()); assertEquals(5L, buffer.getCurrentCheckpointId()); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class)); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); check(sequence[8], buffer.getNextNonBlocked()); @@ -1078,7 +1079,7 @@ public class BarrierBufferTest { check(sequence[2], buffer.getNextNonBlocked()); startTs = System.nanoTime(); check(sequence[5], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); validateAlignmentTime(startTs, buffer); check(sequence[6], buffer.getNextNonBlocked()); @@ -1097,7 +1098,7 @@ public class BarrierBufferTest { check(sequence[16], buffer.getNextNonBlocked()); startTs = System.nanoTime(); check(sequence[20], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); validateAlignmentTime(startTs, buffer); check(sequence[21], buffer.getNextNonBlocked()); @@ -1114,7 +1115,7 @@ public class BarrierBufferTest { // a simple successful checkpoint startTs = System.nanoTime(); check(sequence[32], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); validateAlignmentTime(startTs, buffer); check(sequence[33], buffer.getNextNonBlocked()); @@ -1175,7 +1176,7 @@ public class BarrierBufferTest { // finished first checkpoint check(sequence[3], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); validateAlignmentTime(startTs, buffer); check(sequence[5], buffer.getNextNonBlocked()); @@ -1198,7 +1199,7 @@ public class BarrierBufferTest { assertEquals(0L, buffer.getAlignmentDurationNanos()); // no further checkpoint (abort) notifications - verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class)); verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class)); // all done @@ -1280,7 +1281,7 @@ public class BarrierBufferTest { // checkpoint done check(sequence[7], buffer.getNextNonBlocked()); validateAlignmentTime(startTs, buffer); - verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class)); + verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); // queued data check(sequence[10], buffer.getNextNonBlocked()); @@ -1299,7 +1300,7 @@ public class BarrierBufferTest { checkNoTempFilesRemain(); // check overall notifications - verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class)); verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class)); } @@ -1364,7 +1365,7 @@ public class BarrierBufferTest { // checkpoint finished check(sequence[7], buffer.getNextNonBlocked()); validateAlignmentTime(startTs, buffer); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); check(sequence[11], buffer.getNextNonBlocked()); // remaining data @@ -1380,7 +1381,7 @@ public class BarrierBufferTest { checkNoTempFilesRemain(); // check overall notifications - verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class)); verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class)); } @@ -1389,7 +1390,7 @@ public class BarrierBufferTest { // ------------------------------------------------------------------------ private static BufferOrEvent createBarrier(long checkpointId, int channel) { - return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis()), channel); + return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel); } private static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) { @@ -1487,12 +1488,12 @@ public class BarrierBufferTest { } @Override - public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception { + public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { throw new UnsupportedOperationException("should never be called"); } @Override - public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception { + public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointMetaData.getCheckpointId()); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java index da322f6..05f7da6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -470,7 +471,7 @@ public class BarrierTrackerTest { // ------------------------------------------------------------------------ private static BufferOrEvent createBarrier(long id, int channel) { - return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel); + return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel); } private static BufferOrEvent createCancellationBarrier(long id, int channel) { @@ -502,12 +503,12 @@ public class BarrierTrackerTest { } @Override - public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception { + public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { throw new UnsupportedOperationException("should never be called"); } @Override - public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception { + public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { assertTrue("More checkpoints than expected", i < checkpointIDs.length); final long expectedId = checkpointIDs[i++]; http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java index 5c0f0cf..51294ce 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -174,6 +175,11 @@ public class BlockingCheckpointsTest { } @Override + public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, String operatorIdentifier, String targetLocation) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, @@ -276,7 +282,7 @@ public class BlockingCheckpointsTest { @Override protected void run() throws Exception { - triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), new CheckpointMetrics()); + triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), CheckpointOptions.forFullCheckpoint(), new CheckpointMetrics()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 69c2c88..e22bf86 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -33,6 +33,7 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -408,7 +409,7 @@ public class OneInputStreamTaskTest extends TestLogger { testHarness.invoke(); testHarness.waitForTaskRunning(); - testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0); // These elements should be buffered until we receive barriers from // all inputs @@ -427,14 +428,14 @@ public class OneInputStreamTaskTest extends TestLogger { // we should not yet see the barrier, only the two elements from non-blocked input TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1); - testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0); - testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1); testHarness.waitForInputProcessing(); // now we should see the barrier and after that the buffered elements - expectedOutput.add(new CheckpointBarrier(0, 0)); + expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint())); expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime)); expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime)); @@ -467,7 +468,7 @@ public class OneInputStreamTaskTest extends TestLogger { testHarness.invoke(); testHarness.waitForTaskRunning(); - testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0); // These elements should be buffered until we receive barriers from // all inputs @@ -488,15 +489,15 @@ public class OneInputStreamTaskTest extends TestLogger { // Now give a later barrier to all inputs, this should unblock the first channel, // thereby allowing the two blocked elements through - testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0); - testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1); - testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0); - testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1); + testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 0); + testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 1); + testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 0); + testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 1); expectedOutput.add(new CancelCheckpointMarker(0)); expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime)); expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime)); - expectedOutput.add(new CheckpointBarrier(1, 1)); + expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint())); testHarness.waitForInputProcessing(); @@ -504,9 +505,9 @@ public class OneInputStreamTaskTest extends TestLogger { // Then give the earlier barrier, these should be ignored - testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1); - testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0); - testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1); testHarness.waitForInputProcessing(); @@ -557,7 +558,7 @@ public class OneInputStreamTaskTest extends TestLogger { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp); - while(!streamTask.triggerCheckpoint(checkpointMetaData)); + while(!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint())); // since no state was set, there shouldn't be restore calls assertEquals(0, TestingStreamOperator.numberRestoreCalls); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index 0773699..1a6fa8f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -231,7 +232,7 @@ public class SourceStreamTaskTest { for (int i = 0; i < numCheckpoints; i++) { long currentCheckpointId = checkpointId.getAndIncrement(); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(currentCheckpointId, 0L); - sourceTask.triggerCheckpoint(checkpointMetaData); + sourceTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint()); Thread.sleep(checkpointInterval); } return true; http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java index c2d4aaa..53f77ca 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; @@ -56,7 +57,8 @@ public class StreamTaskCancellationBarrierTest { testHarness.invoke(); // tell the task to commence a checkpoint - boolean result = task.triggerCheckpoint(new CheckpointMetaData(41L, System.currentTimeMillis())); + boolean result = task.triggerCheckpoint(new CheckpointMetaData(41L, System.currentTimeMillis()), + CheckpointOptions.forFullCheckpoint()); assertFalse("task triggered checkpoint though not ready", result); // a cancellation barrier should be downstream http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 1e74c3e..3d01fdd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -305,18 +306,18 @@ public class StreamTaskTest extends TestLogger { final Exception testException = new Exception("Test exception"); - when(streamOperator1.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult1); - when(streamOperator2.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult2); - when(streamOperator3.snapshotState(anyLong(), anyLong())).thenThrow(testException); + when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult1); + when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2); + when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenThrow(testException); // mock the returned legacy snapshots StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class); StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class); StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class); - when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle1); - when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle2); - when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle3); + when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1); + when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2); + when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3); // set up the task @@ -332,7 +333,7 @@ public class StreamTaskTest extends TestLogger { Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration())); try { - streamTask.triggerCheckpoint(checkpointMetaData); + streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint()); fail("Expected test exception here."); } catch (Exception e) { assertEquals(testException, e.getCause()); @@ -380,18 +381,18 @@ public class StreamTaskTest extends TestLogger { when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture); - when(streamOperator1.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult1); - when(streamOperator2.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult2); - when(streamOperator3.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult3); + when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult1); + when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2); + when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult3); // mock the legacy state snapshot StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class); StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class); StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class); - when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle1); - when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle2); - when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle3); + when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1); + when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2); + when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3); StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3}; @@ -405,7 +406,7 @@ public class StreamTaskTest extends TestLogger { Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", new DirectExecutorService()); Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration())); - streamTask.triggerCheckpoint(checkpointMetaData); + streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint()); verify(streamTask).handleAsyncException(anyString(), any(Throwable.class)); @@ -468,7 +469,7 @@ public class StreamTaskTest extends TestLogger { new DoneFuture<>(managedOperatorStateHandle), new DoneFuture<>(rawOperatorStateHandle)); - when(streamOperator.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult); + when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult); StreamOperator<?>[] streamOperators = {streamOperator}; @@ -495,7 +496,7 @@ public class StreamTaskTest extends TestLogger { Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration())); Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend); - streamTask.triggerCheckpoint(checkpointMetaData); + streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint()); acknowledgeCheckpointLatch.await(); @@ -584,7 +585,7 @@ public class StreamTaskTest extends TestLogger { new DoneFuture<>(managedOperatorStateHandle), new DoneFuture<>(rawOperatorStateHandle)); - when(streamOperator.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult); + when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult); StreamOperator<?>[] streamOperators = {streamOperator}; @@ -613,7 +614,7 @@ public class StreamTaskTest extends TestLogger { Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration())); Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend); - streamTask.triggerCheckpoint(checkpointMetaData); + streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint()); createSubtask.await(); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index c0a1638..d465619 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.streaming.api.functions.co.CoMapFunction; @@ -225,7 +226,7 @@ public class TwoInputStreamTaskTest { testHarness.invoke(); testHarness.waitForTaskRunning(); - testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0); // This element should be buffered since we received a checkpoint barrier on // this input @@ -262,16 +263,16 @@ public class TwoInputStreamTaskTest { expectedOutput, testHarness.getOutput()); - testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1); - testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0); - testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1); testHarness.waitForInputProcessing(); testHarness.endInput(); testHarness.waitForTaskCompletion(); // now we should see the barrier and after that the buffered elements - expectedOutput.add(new CheckpointBarrier(0, 0)); + expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint())); expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime)); TestHarnessUtil.assertOutputEquals("Output was not correct.", @@ -306,7 +307,7 @@ public class TwoInputStreamTaskTest { testHarness.invoke(); testHarness.waitForTaskRunning(); - testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0); // These elements should be buffered until we receive barriers from // all inputs @@ -329,15 +330,15 @@ public class TwoInputStreamTaskTest { // Now give a later barrier to all inputs, this should unblock the first channel, // thereby allowing the two blocked elements through - testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0); - testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1); - testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0); - testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1); + testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 0); + testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 1); + testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 0); + testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 1); expectedOutput.add(new CancelCheckpointMarker(0)); expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime)); expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime)); - expectedOutput.add(new CheckpointBarrier(1, 1)); + expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint())); testHarness.waitForInputProcessing(); @@ -347,9 +348,9 @@ public class TwoInputStreamTaskTest { // Then give the earlier barrier, these should be ignored - testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1); - testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0); - testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1); testHarness.waitForInputProcessing(); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 01afec6..07424f7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -29,6 +29,7 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer; import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState; import org.apache.flink.migration.util.MigrationInstantiationUtil; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner; import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner; import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; @@ -478,11 +479,16 @@ public class AbstractStreamOperatorTestHarness<OUT> { } /** - * Calls {@link StreamOperator#snapshotState(long, long)}. + * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions)}. */ public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception { - OperatorSnapshotResult operatorStateResult = operator.snapshotState(checkpointId, timestamp); + CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(new JobID(), "test_op"); + + OperatorSnapshotResult operatorStateResult = operator.snapshotState( + checkpointId, + timestamp, + CheckpointOptions.forFullCheckpoint()); KeyGroupsStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture()); KeyGroupsStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture()); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index cde5780..effb44c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -143,9 +144,11 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } if (keyedStateBackend != null) { - RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(checkpointId, + RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot( + checkpointId, timestamp, - streamFactory); + streamFactory, + CheckpointOptions.forFullCheckpoint()); if(!keyedSnapshotRunnable.isDone()) { Thread runner = new Thread(keyedSnapshotRunnable); runner.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 128522b..ac37009 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -23,6 +23,7 @@ import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import java.io.FileNotFoundException; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.MapFunction; @@ -160,21 +161,21 @@ public class SavepointITCase extends TestLogger { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager); final File checkpointDir = new File(tmpDir, "checkpoints"); - final File savepointDir = new File(tmpDir, "savepoints"); + final File savepointRootDir = new File(tmpDir, "savepoints"); - if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) { + if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) { fail("Test setup failed: failed to create temporary directories."); } LOG.info("Created temporary checkpoint directory: " + checkpointDir + "."); - LOG.info("Created temporary savepoint directory: " + savepointDir + "."); + LOG.info("Created temporary savepoint directory: " + savepointRootDir + "."); config.setString(CoreOptions.STATE_BACKEND, "filesystem"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0"); config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, - savepointDir.toURI().toString()); + savepointRootDir.toURI().toString()); LOG.info("Flink configuration: " + config + "."); @@ -217,14 +218,6 @@ public class SavepointITCase extends TestLogger { .result(savepointPathFuture, deadline.timeLeft())).savepointPath(); LOG.info("Retrieved savepoint path: " + savepointPath + "."); - // Only one savepoint should exist - File[] files = savepointDir.listFiles(); - if (files != null) { - assertEquals("Savepoint not created in expected directory", 1, files.length); - } else { - fail("Savepoint not created in expected directory"); - } - // Retrieve the savepoint from the testing job manager LOG.info("Requesting the savepoint."); Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft()); @@ -240,15 +233,33 @@ public class SavepointITCase extends TestLogger { // - Verification START ------------------------------------------- + // Only one savepoint should exist + File[] files = savepointRootDir.listFiles(); + + if (files != null) { + assertEquals("Savepoint not created in expected directory", 1, files.length); + assertTrue("Savepoint did not create self-contained directory", files[0].isDirectory()); + + File savepointDir = files[0]; + File[] savepointFiles = savepointDir.listFiles(); + assertNotNull(savepointFiles); + assertTrue("Did not write savepoint files to directory",savepointFiles.length > 1); + } else { + fail("Savepoint not created in expected directory"); + } + // Only one checkpoint of the savepoint should exist // We currently have the following directory layout: checkpointDir/jobId/chk-ID - files = checkpointDir.listFiles(); - assertNotNull("Checkpoint directory empty", files); - assertEquals("Checkpoints directory cleaned up, but needed for savepoint.", 1, files.length); - assertEquals("No job-specific base directory", jobGraph.getJobID().toString(), files[0].getName()); + File jobCheckpoints = new File(checkpointDir, jobId.toString()); + + if (jobCheckpoints.exists()) { + files = jobCheckpoints.listFiles(); + assertNotNull("Checkpoint directory empty", files); + assertEquals("Checkpoints directory not cleaned up: " + Arrays.toString(files), 0, files.length); + } // Only one savepoint should exist - files = savepointDir.listFiles(); + files = savepointRootDir.listFiles(); assertNotNull("Savepoint directory empty", files); assertEquals("No savepoint found in savepoint directory", 1, files.length); @@ -399,8 +410,8 @@ public class SavepointITCase extends TestLogger { // All savepoints should have been cleaned up errMsg = "Savepoints directory not cleaned up properly: " + - Arrays.toString(savepointDir.listFiles()) + "."; - assertEquals(errMsg, 0, savepointDir.listFiles().length); + Arrays.toString(savepointRootDir.listFiles()) + "."; + assertEquals(errMsg, 0, savepointRootDir.listFiles().length); // - Verification END --------------------------------------------- } finally { @@ -468,7 +479,7 @@ public class SavepointITCase extends TestLogger { flink.submitJobAndWait(jobGraph, false); } catch (Exception e) { assertEquals(JobExecutionException.class, e.getClass()); - assertEquals(IllegalArgumentException.class, e.getCause().getClass()); + assertEquals(FileNotFoundException.class, e.getCause().getClass()); } } finally { if (flink != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java index ec6a8f5..79665dd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java @@ -86,7 +86,6 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase { } } - public static class FailingStateBackend extends AbstractStateBackend { private static final long serialVersionUID = 1L; @@ -97,6 +96,12 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase { } @Override + public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, + String operatorIdentifier, String targetLocation) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID,
