http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index c2ada3b..d8e46fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -313,8 +313,8 @@ public class CheckpointCoordinatorTest { assertFalse(checkpoint.isFullyAcknowledged()); // check that the vertices received the trigger checkpoint message - verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp); - verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp); + verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint()); + verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint()); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L); @@ -428,14 +428,14 @@ public class CheckpointCoordinatorTest { // check that the vertices received the trigger checkpoint message { - verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp)); - verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp)); + verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), any(CheckpointOptions.class)); + verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), any(CheckpointOptions.class)); } // check that the vertices received the trigger checkpoint message for the second checkpoint { - verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2)); - verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2)); + verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), any(CheckpointOptions.class)); + verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), any(CheckpointOptions.class)); } // decline checkpoint from one of the tasks, this should cancel the checkpoint @@ -529,8 +529,8 @@ public class CheckpointCoordinatorTest { // check that the vertices received the trigger checkpoint message { - verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp)); - verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp)); + verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class)); + verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class)); } // acknowledge from one of the tasks @@ -558,8 +558,8 @@ public class CheckpointCoordinatorTest { // validate that the relevant tasks got a confirmation message { - verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp)); - verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp)); + verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class)); + verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class)); } CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0); @@ -589,8 +589,8 @@ public class CheckpointCoordinatorTest { // validate that the relevant tasks got a confirmation message { - verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew)); - verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew)); + verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class)); + verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class)); verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew)); verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew)); @@ -660,8 +660,8 @@ public class CheckpointCoordinatorTest { long checkpointId1 = pending1.getCheckpointId(); // trigger messages should have been sent - verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1)); - verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1)); + verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class)); + verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class)); CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L); @@ -687,8 +687,8 @@ public class CheckpointCoordinatorTest { CheckpointMetaData checkpointMetaData2 = new CheckpointMetaData(checkpointId2, 0L); // trigger messages should have been sent - verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2)); - verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2)); + verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class)); + verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class)); // we acknowledge the remaining two tasks from the first // checkpoint and two tasks from the second checkpoint @@ -794,8 +794,8 @@ public class CheckpointCoordinatorTest { long checkpointId1 = pending1.getCheckpointId(); // trigger messages should have been sent - verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1)); - verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1)); + verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class)); + verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class)); CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L); @@ -819,8 +819,8 @@ public class CheckpointCoordinatorTest { long checkpointId2 = pending2.getCheckpointId(); // trigger messages should have been sent - verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2)); - verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2)); + verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class)); + verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class)); // we acknowledge one more task from the first checkpoint and the second // checkpoint completely. The second checkpoint should then subsume the first checkpoint @@ -1142,7 +1142,7 @@ public class CheckpointCoordinatorTest { numCalls.incrementAndGet(); return null; } - }).when(execution).triggerCheckpoint(anyLong(), anyLong()); + }).when(execution).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class)); CheckpointCoordinator coord = new CheckpointCoordinator( jid, @@ -1232,7 +1232,7 @@ public class CheckpointCoordinatorTest { triggerCalls.add((Long) invocation.getArguments()[0]); return null; } - }).when(executionAttempt).triggerCheckpoint(anyLong(), anyLong()); + }).when(executionAttempt).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class)); final long delay = 50; @@ -1398,7 +1398,6 @@ public class CheckpointCoordinatorTest { assertFalse(savepointFuture.isDone()); long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); - CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L); coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew)); coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew)); @@ -1414,8 +1413,8 @@ public class CheckpointCoordinatorTest { // validate that the relevant tasks got a confirmation message { - verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew)); - verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew)); + verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class)); + verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class)); verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew)); verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew)); @@ -1537,7 +1536,7 @@ public class CheckpointCoordinatorTest { numCalls.incrementAndGet(); return null; } - }).when(execution).triggerCheckpoint(anyLong(), anyLong()); + }).when(execution).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class)); doAnswer(new Answer<Void>() { @Override @@ -1578,7 +1577,7 @@ public class CheckpointCoordinatorTest { assertEquals(maxConcurrentAttempts, numCalls.get()); verify(triggerVertex.getCurrentExecutionAttempt(), times(maxConcurrentAttempts)) - .triggerCheckpoint(anyLong(), anyLong()); + .triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class)); // now, once we acknowledge one checkpoint, it should trigger the next one coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java new file mode 100644 index 0000000..6788338 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType; +import org.junit.Test; + +public class CheckpointOptionsTest { + + @Test + public void testFullCheckpoint() throws Exception { + CheckpointOptions options = CheckpointOptions.forFullCheckpoint(); + assertEquals(CheckpointType.FULL_CHECKPOINT, options.getCheckpointType()); + assertNull(options.getTargetLocation()); + } + + @Test + public void testSavepoint() throws Exception { + String location = "asdasdadasdasdja7931481398123123123kjhasdkajsd"; + CheckpointOptions options = CheckpointOptions.forSavepoint(location); + assertEquals(CheckpointType.SAVEPOINT, options.getCheckpointType()); + assertEquals(location, options.getTargetLocation()); + } + + @Test(expected = NullPointerException.class) + public void testSavepointNullCheck() throws Exception { + CheckpointOptions.forSavepoint(null); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java index 3c373f1..95a31d4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java @@ -184,6 +184,7 @@ public class CheckpointStatsHistoryTest { when(completed.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED); when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint()); when(completed.getCheckpointId()).thenReturn(checkpointId); + when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint()); return completed; } http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java index 512768d..6ab8620 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java @@ -183,7 +183,7 @@ public class MigrationV0ToV1Test { } finally { // Dispose - SavepointStore.removeSavepoint(path.toString()); + SavepointStore.removeSavepointFile(path.toString()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java index 6471d6f..c66b29d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java @@ -64,12 +64,12 @@ public class SavepointLoaderTest { Map<JobVertexID, TaskState> taskStates = new HashMap<>(); taskStates.put(vertexId, state); + JobID jobId = new JobID(); + // Store savepoint SavepointV1 savepoint = new SavepointV1(checkpointId, taskStates.values()); String path = SavepointStore.storeSavepoint(tmp.getAbsolutePath(), savepoint); - JobID jobId = new JobID(); - ExecutionJobVertex vertex = mock(ExecutionJobVertex.class); when(vertex.getParallelism()).thenReturn(parallelism); when(vertex.getMaxParallelism()).thenReturn(parallelism); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java index 3398341..dc19e47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.checkpoint.savepoint; +import java.io.File; +import java.util.Arrays; +import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -38,6 +41,7 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -54,14 +58,22 @@ public class SavepointStoreTest { */ @Test public void testStoreLoadDispose() throws Exception { - String target = tmp.getRoot().getAbsolutePath(); + String root = tmp.getRoot().getAbsolutePath(); + File rootFile = new File(root); - assertEquals(0, tmp.getRoot().listFiles().length); + File[] list = rootFile.listFiles(); + + assertNotNull(list); + assertEquals(0, list.length); // Store + String savepointDirectory = SavepointStore.createSavepointDirectory(root, new JobID()); SavepointV1 stored = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24)); - String path = SavepointStore.storeSavepoint(target, stored); - assertEquals(1, tmp.getRoot().listFiles().length); + String path = SavepointStore.storeSavepoint(savepointDirectory, stored); + + list = rootFile.listFiles(); + assertNotNull(list); + assertEquals(1, list.length); // Load Savepoint loaded = SavepointStore.loadSavepoint(path, Thread.currentThread().getContextClassLoader()); @@ -70,9 +82,11 @@ public class SavepointStoreTest { loaded.dispose(); // Dispose - SavepointStore.removeSavepoint(path); + SavepointStore.deleteSavepointDirectory(path); - assertEquals(0, tmp.getRoot().listFiles().length); + list = rootFile.listFiles(); + assertNotNull(list); + assertEquals(0, list.length); } /** @@ -108,8 +122,8 @@ public class SavepointStoreTest { assertTrue(serializers.size() >= 1); - String target = tmp.getRoot().getAbsolutePath(); - assertEquals(0, tmp.getRoot().listFiles().length); + String root = tmp.getRoot().getAbsolutePath(); + File rootFile = new File(root); // New savepoint type for test int version = ThreadLocalRandom.current().nextInt(); @@ -118,14 +132,24 @@ public class SavepointStoreTest { // Add serializer serializers.put(version, NewSavepointSerializer.INSTANCE); + String savepointDirectory1 = SavepointStore.createSavepointDirectory(root, new JobID()); TestSavepoint newSavepoint = new TestSavepoint(version, checkpointId); - String pathNewSavepoint = SavepointStore.storeSavepoint(target, newSavepoint); - assertEquals(1, tmp.getRoot().listFiles().length); + String pathNewSavepoint = SavepointStore.storeSavepoint(savepointDirectory1, newSavepoint); + + File[] list = rootFile.listFiles(); + + assertNotNull(list); + assertEquals(1, list.length); // Savepoint v0 + String savepointDirectory2 = SavepointStore.createSavepointDirectory(root, new JobID()); Savepoint savepoint = new SavepointV1(checkpointId, SavepointV1Test.createTaskStates(4, 32)); - String pathSavepoint = SavepointStore.storeSavepoint(target, savepoint); - assertEquals(2, tmp.getRoot().listFiles().length); + String pathSavepoint = SavepointStore.storeSavepoint(savepointDirectory2, savepoint); + + list = rootFile.listFiles(); + + assertNotNull(list); + assertEquals(2, list.length); // Load Savepoint loaded = SavepointStore.loadSavepoint(pathNewSavepoint, Thread.currentThread().getContextClassLoader()); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java new file mode 100644 index 0000000..dd5b0b6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.util.DataInputDeserializer; +import org.apache.flink.runtime.util.DataOutputSerializer; +import org.junit.Test; + +public class CheckpointBarrierTest { + + /** + * Test serialization of the checkpoint barrier. + */ + @Test + public void testSerialization() throws Exception { + long id = Integer.MAX_VALUE + 123123L; + long timestamp = Integer.MAX_VALUE + 1228L; + + CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint(); + testSerialization(id, timestamp, checkpoint); + + CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123"); + testSerialization(id, timestamp, savepoint); + } + + private void testSerialization(long id, long timestamp, CheckpointOptions options) throws IOException { + CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options); + + DataOutputSerializer out = new DataOutputSerializer(1024); + barrier.write(out); + + DataInputDeserializer in = new DataInputDeserializer(out.wrapAsByteBuffer()); + CheckpointBarrier deserialized = new CheckpointBarrier(); + deserialized.read(in); + + assertEquals(id, deserialized.getId()); + assertEquals(timestamp, deserialized.getTimestamp()); + assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType()); + assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java index 271d0d2..e674eb7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java @@ -18,6 +18,14 @@ package org.apache.flink.runtime.io.network.api.serialization; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -25,25 +33,42 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.util.TestTaskEvent; - import org.junit.Test; -import java.io.IOException; -import java.nio.ByteBuffer; +public class EventSerializerTest { -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; + @Test + public void testCheckpointBarrierSerialization() throws Exception { + long id = Integer.MAX_VALUE + 123123L; + long timestamp = Integer.MAX_VALUE + 1228L; -public class EventSerializerTest { + CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint(); + testCheckpointBarrierSerialization(id, timestamp, checkpoint); + + CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123"); + testCheckpointBarrierSerialization(id, timestamp, savepoint); + } + + private void testCheckpointBarrierSerialization(long id, long timestamp, CheckpointOptions options) throws IOException { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + + CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options); + ByteBuffer serialized = EventSerializer.toSerializedEvent(barrier); + CheckpointBarrier deserialized = (CheckpointBarrier) EventSerializer.fromSerializedEvent(serialized, cl); + assertFalse(serialized.hasRemaining()); + + assertEquals(id, deserialized.getId()); + assertEquals(timestamp, deserialized.getTimestamp()); + assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType()); + assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation()); + } @Test public void testSerializeDeserializeEvent() throws Exception { AbstractEvent[] events = { EndOfPartitionEvent.INSTANCE, EndOfSuperstepEvent.INSTANCE, - new CheckpointBarrier(1678L, 4623784L), + new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forFullCheckpoint()), new TestTaskEvent(Math.random(), 12361231273L), new CancelCheckpointMarker(287087987329842L) }; @@ -94,7 +119,7 @@ public class EventSerializerTest { AbstractEvent[] events = { EndOfPartitionEvent.INSTANCE, EndOfSuperstepEvent.INSTANCE, - new CheckpointBarrier(1678L, 4623784L), + new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forFullCheckpoint()), new TestTaskEvent(Math.random(), 12361231273L), new CancelCheckpointMarker(287087987329842L) }; http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index 63175ed..900b5c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -327,7 +328,7 @@ public class RecordWriterTest { ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider); RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>()); - CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L); + CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forFullCheckpoint()); // No records emitted yet, broadcast should not request a buffer writer.broadcastEvent(barrier); @@ -363,7 +364,7 @@ public class RecordWriterTest { ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider); RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>()); - CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L); + CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forFullCheckpoint()); // Emit records on some channels first (requesting buffers), then // broadcast the event. The record buffers should be emitted first, then http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index de54d1f..5a38be2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.blob.BlobService; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; @@ -601,7 +602,7 @@ public class JobManagerHARecoveryTest { } @Override - public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception { + public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { ByteStreamStateHandle byteStreamStateHandle = new TestByteStreamStateHandleDeepCompare( String.valueOf(UUID.randomUUID()), InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId())); @@ -619,7 +620,7 @@ public class JobManagerHARecoveryTest { } @Override - public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception { + public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { throw new UnsupportedOperationException("should not be called!"); } http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java index db45231..bc420cc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java @@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -49,7 +50,7 @@ public class CheckpointMessagesTest { NotifyCheckpointComplete cc = new NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L); testSerializabilityEqualsHashCode(cc); - TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L); + TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L, CheckpointOptions.forFullCheckpoint()); testSerializabilityEqualsHashCode(tc); } http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 5bd085f..94df524 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.junit.Test; @@ -165,7 +166,7 @@ public class OperatorStateBackendTest { listState3.add(20); CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator"); - OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory).get(); + OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()).get(); try { http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 3b0350d..f2416b9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -41,6 +41,7 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -191,7 +192,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -202,7 +203,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.update("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory)); + KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -403,7 +404,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals(13, (int) state2.value()); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); backend = restoreKeyedBackend( @@ -476,7 +477,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals(42L, (long) state.value()); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1); @@ -521,7 +522,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -532,7 +533,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory)); + KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -620,7 +621,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -631,7 +632,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory)); + KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -722,7 +723,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -734,7 +735,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add(103); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory)); + KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -829,7 +830,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -841,7 +842,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }}); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory)); + KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -1163,7 +1164,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.update("ShouldBeInSecondHalf"); - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory)); + KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint())); List<KeyGroupsStateHandle> firstHalfKeyGroupStates = StateAssignmentOperation.getKeyGroupsStateHandles( Collections.singletonList(snapshot), @@ -1230,7 +1231,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.update("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1281,7 +1282,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1334,7 +1335,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1385,7 +1386,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.put("2", "Second"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1661,7 +1662,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class)); - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory)); + KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); @@ -1692,7 +1693,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class); // draw a snapshot - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory)); + KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint())); assertNull(snapshot); backend.dispose(); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java new file mode 100644 index 0000000..a29d29c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.File; +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class FsSavepointStreamFactoryTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + /** + * Tests that the factory creates all files in the given directory without + * creating any sub directories. + */ + @Test + public void testSavepointStreamDirectoryLayout() throws Exception { + File testRoot = folder.newFolder(); + JobID jobId = new JobID(); + + FsSavepointStreamFactory savepointStreamFactory = new FsSavepointStreamFactory( + new Path(testRoot.getAbsolutePath()), + jobId, + 0); + + File[] listed = testRoot.listFiles(); + assertNotNull(listed); + assertEquals(0, listed.length); + + FsCheckpointStateOutputStream stream = savepointStreamFactory + .createCheckpointStateOutputStream(1273, 19231); + + stream.write(1); + + FileStateHandle handle = (FileStateHandle) stream.closeAndGetHandle(); + + listed = testRoot.listFiles(); + assertNotNull(listed); + assertEquals(1, listed.length); + assertEquals(handle.getFilePath().getPath(), listed[0].getPath()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 187163d..89ae5da 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; @@ -91,7 +92,7 @@ public class TaskAsyncCallTest { awaitLatch.await(); for (int i = 1; i <= NUM_CALLS; i++) { - task.triggerCheckpointBarrier(i, 156865867234L); + task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forFullCheckpoint()); } triggerLatch.await(); @@ -121,7 +122,7 @@ public class TaskAsyncCallTest { awaitLatch.await(); for (int i = 1; i <= NUM_CALLS; i++) { - task.triggerCheckpointBarrier(i, 156865867234L); + task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forFullCheckpoint()); task.notifyCheckpointComplete(i); } @@ -226,7 +227,7 @@ public class TaskAsyncCallTest { public void setInitialState(TaskStateHandles taskStateHandles) throws Exception {} @Override - public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) { + public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) { lastCheckpointId++; if (checkpointMetaData.getCheckpointId() == lastCheckpointId) { if (lastCheckpointId == NUM_CALLS) { @@ -243,7 +244,7 @@ public class TaskAsyncCallTest { } @Override - public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception { + public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { throw new UnsupportedOperationException("Should not be called"); } http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 144247f..05fda28 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators; +import java.io.IOException; import org.apache.commons.io.IOUtils; import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.apache.flink.annotation.PublicEvolving; @@ -36,6 +37,8 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -340,17 +343,19 @@ public abstract class AbstractStreamOperator<OUT> } @Override - public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) throws Exception { + public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception { KeyGroupRange keyGroupRange = null != keyedStateBackend ? keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult(); + CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions); + try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl( checkpointId, timestamp, - checkpointStreamFactory, + factory, keyGroupRange, getContainingTask().getCancelables())) { @@ -361,12 +366,12 @@ public abstract class AbstractStreamOperator<OUT> if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( - operatorStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory)); + operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } if (null != keyedStateBackend) { snapshotInProgress.setKeyedStateManagedFuture( - keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory)); + keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } } catch (Exception snapshotException) { try { @@ -431,11 +436,12 @@ public abstract class AbstractStreamOperator<OUT> @SuppressWarnings("deprecation") @Deprecated @Override - public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception { + public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception { if (this instanceof StreamCheckpointedOperator) { + CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions); final CheckpointStreamFactory.CheckpointStateOutputStream outStream = - checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); + factory.createCheckpointStateOutputStream(checkpointId, timestamp); getContainingTask().getCancelables().registerClosable(outStream); @@ -495,6 +501,31 @@ public abstract class AbstractStreamOperator<OUT> @Override public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {} + /** + * Returns a checkpoint stream factory for the provided options. + * + * <p>For {@link CheckpointType#FULL_CHECKPOINT} this returns the shared + * factory of this operator. + * + * <p>For {@link CheckpointType#SAVEPOINT} it creates a custom factory per + * savepoint. + * + * @param checkpointOptions Options for the checkpoint + * @return Checkpoint stream factory for the checkpoints + * @throws IOException Failures while creating a new stream factory are forwarded + */ + @VisibleForTesting + CheckpointStreamFactory getCheckpointStreamFactory(CheckpointOptions checkpointOptions) throws IOException { + CheckpointType checkpointType = checkpointOptions.getCheckpointType(); + if (checkpointType == CheckpointType.FULL_CHECKPOINT) { + return checkpointStreamFactory; + } else if (checkpointType == CheckpointType.SAVEPOINT) { + return container.createSavepointStreamFactory(this, checkpointOptions.getTargetLocation()); + } else { + throw new IllegalStateException("Unknown checkpoint type " + checkpointType); + } + } + // ------------------------------------------------------------------------ // Properties and Services // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java index 5a6c37b..83697ae 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java @@ -26,7 +26,7 @@ import org.apache.flink.util.ExceptionUtils; import java.util.concurrent.RunnableFuture; /** - * Result of {@link AbstractStreamOperator#snapshotState}. + * Result of {@link StreamOperator#snapshotState}. */ public class OperatorSnapshotResult { http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index d8e4d08..006e910 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -98,7 +99,10 @@ public interface StreamOperator<OUT> extends Serializable { * * @throws Exception exception that happened during snapshotting. */ - OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) throws Exception; + OperatorSnapshotResult snapshotState( + long checkpointId, + long timestamp, + CheckpointOptions checkpointOptions) throws Exception; /** * Takes a snapshot of the legacy operator state defined via {@link StreamCheckpointedOperator}. @@ -110,7 +114,10 @@ public interface StreamOperator<OUT> extends Serializable { */ @SuppressWarnings("deprecation") @Deprecated - StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception; + StreamStateHandle snapshotLegacyOperatorState( + long checkpointId, + long timestamp, + CheckpointOptions checkpointOptions) throws Exception; /** * Provides state handles to restore the operator state. @@ -142,4 +149,5 @@ public interface StreamOperator<OUT> extends Serializable { void setChainingStrategy(ChainingStrategy strategy); MetricGroup getMetricGroup(); + } http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 611bd44..2da8389 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -368,7 +368,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler { .setBytesBufferedInAlignment(bytesBuffered) .setAlignmentDurationNanos(latestAlignmentDurationNanos); - toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics); + toNotifyOnCheckpoint.triggerCheckpointOnBarrier( + checkpointMetaData, + checkpointBarrier.getCheckpointOptions(), + checkpointMetrics); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java index 77608c6..8b1b65b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; @@ -132,7 +133,7 @@ public class BarrierTracker implements CheckpointBarrierHandler { // fast path for single channel trackers if (totalNumberOfInputChannels == 1) { - notifyCheckpoint(barrierId, receivedBarrier.getTimestamp()); + notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions()); return; } @@ -170,7 +171,7 @@ public class BarrierTracker implements CheckpointBarrierHandler { LOG.debug("Received all barriers for checkpoint {}", barrierId); } - notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp()); + notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions()); } } } @@ -248,14 +249,14 @@ public class BarrierTracker implements CheckpointBarrierHandler { } } - private void notifyCheckpoint(long checkpointId, long timestamp) throws Exception { + private void notifyCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception { if (toNotifyOnCheckpoint != null) { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); CheckpointMetrics checkpointMetrics = new CheckpointMetrics() .setBytesBufferedInAlignment(0L) .setAlignmentDurationNanos(0L); - toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics); + toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, checkpointMetrics); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 4f07182..dd93592 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -164,9 +165,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea } } - public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException { + public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException { try { - CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp); + CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions); for (RecordWriterOutput<?> streamOutput : streamOutputs) { streamOutput.broadcastEvent(barrier); } http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 62cfb8f..938ffd2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; @@ -163,7 +164,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private TaskStateHandles restoreStateHandles; - /** The currently active background materialization threads */ private final CloseableRegistry cancelables = new CloseableRegistry(); @@ -520,14 +520,14 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } @Override - public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception { + public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { try { // No alignment if we inject a checkpoint CheckpointMetrics checkpointMetrics = new CheckpointMetrics() .setBytesBufferedInAlignment(0L) .setAlignmentDurationNanos(0L); - return performCheckpoint(checkpointMetaData, checkpointMetrics); + return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics); } catch (Exception e) { // propagate exceptions only if the task is still in "running" state @@ -543,9 +543,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } @Override - public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception { + public void triggerCheckpointOnBarrier( + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + CheckpointMetrics checkpointMetrics) throws Exception { + try { - performCheckpoint(checkpointMetaData, checkpointMetrics); + performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics); } catch (CancelTaskException e) { throw new Exception("Operator " + getName() + " was cancelled while performing checkpoint " + @@ -570,8 +574,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } - private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception { - LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName()); + private boolean performCheckpoint( + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + CheckpointMetrics checkpointMetrics) throws Exception { + + LOG.debug("Starting checkpoint ({}) {} on task {}", + checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName()); synchronized (lock) { if (isRunning) { @@ -582,9 +591,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // Given this, we immediately emit the checkpoint barriers, so the downstream operators // can start their checkpoint work as soon as possible operatorChain.broadcastCheckpointBarrier( - checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()); + checkpointMetaData.getCheckpointId(), + checkpointMetaData.getTimestamp(), + checkpointOptions); - checkpointState(checkpointMetaData, checkpointMetrics); + checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); return true; } else { @@ -637,8 +648,17 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } - private void checkpointState(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception { - CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this, checkpointMetaData, checkpointMetrics); + private void checkpointState( + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + CheckpointMetrics checkpointMetrics) throws Exception { + + CheckpointingOperation checkpointingOperation = new CheckpointingOperation( + this, + checkpointMetaData, + checkpointOptions, + checkpointMetrics); + checkpointingOperation.executeCheckpointing(); } @@ -814,7 +834,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> return stateBackend.createStreamFactory( getEnvironment().getJobID(), createOperatorIdentifier(operator, configuration.getVertexID())); + } + public CheckpointStreamFactory createSavepointStreamFactory(StreamOperator<?> operator, String targetLocation) throws IOException { + return stateBackend.createSavepointStreamFactory( + getEnvironment().getJobID(), + createOperatorIdentifier(operator, configuration.getVertexID()), + targetLocation); } private String createOperatorIdentifier(StreamOperator<?> operator, int vertexId) { @@ -1048,6 +1074,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private final StreamTask<?, ?> owner; private final CheckpointMetaData checkpointMetaData; + private final CheckpointOptions checkpointOptions; private final CheckpointMetrics checkpointMetrics; private final StreamOperator<?>[] allOperators; @@ -1060,9 +1087,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private final List<StreamStateHandle> nonPartitionedStates; private final List<OperatorSnapshotResult> snapshotInProgressList; - public CheckpointingOperation(StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) { + public CheckpointingOperation( + StreamTask<?, ?> owner, + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + CheckpointMetrics checkpointMetrics) { + this.owner = Preconditions.checkNotNull(owner); this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData); + this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions); this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics); this.allOperators = owner.operatorChain.getAllOperators(); this.nonPartitionedStates = new ArrayList<>(allOperators.length); @@ -1137,14 +1170,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> @SuppressWarnings("deprecation") private void checkpointStreamOperator(StreamOperator<?> op) throws Exception { if (null != op) { - // first call the legacy checkpoint code paths + // first call the legacy checkpoint code paths nonPartitionedStates.add(op.snapshotLegacyOperatorState( checkpointMetaData.getCheckpointId(), - checkpointMetaData.getTimestamp())); + checkpointMetaData.getTimestamp(), + checkpointOptions)); OperatorSnapshotResult snapshotInProgress = op.snapshotState( checkpointMetaData.getCheckpointId(), - checkpointMetaData.getTimestamp()); + checkpointMetaData.getTimestamp(), + checkpointOptions); snapshotInProgressList.add(snapshotInProgress); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java index 6751617..51b9d9a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java @@ -100,4 +100,4 @@ public class ListCheckpointedTest { return restored; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index 274611a..8507200 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -17,12 +17,34 @@ */ package org.apache.flink.streaming.api.operators; +import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.doReturn; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.spy; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.RunnableFuture; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; @@ -45,27 +67,6 @@ import org.mockito.internal.util.reflection.Whitebox; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.RunnableFuture; - -import static junit.framework.TestCase.assertTrue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.verify; -import static org.powermock.api.mockito.PowerMockito.doReturn; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.spy; -import static org.powermock.api.mockito.PowerMockito.when; -import static org.powermock.api.mockito.PowerMockito.whenNew; - /** * Tests for the facilities provided by {@link AbstractStreamOperator}. This mostly * tests timers and state and whether they are correctly checkpointed/restored @@ -495,10 +496,10 @@ public class AbstractStreamOperatorTest { when(containingTask.getCancelables()).thenReturn(closeableRegistry); AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class); - when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod(); + when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod(); doReturn(containingTask).when(operator).getContainingTask(); - operator.snapshotState(checkpointId, timestamp); + operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint()); verify(context).close(); } @@ -524,14 +525,14 @@ public class AbstractStreamOperatorTest { when(containingTask.getCancelables()).thenReturn(closeableRegistry); AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class); - when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod(); + when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod(); doReturn(containingTask).when(operator).getContainingTask(); // lets fail when calling the actual snapshotState method doThrow(failingException).when(operator).snapshotState(eq(context)); try { - operator.snapshotState(checkpointId, timestamp); + operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint()); fail("Exception expected."); } catch (Exception e) { assertEquals(failingException, e.getCause()); @@ -571,23 +572,29 @@ public class AbstractStreamOperatorTest { when(containingTask.getCancelables()).thenReturn(closeableRegistry); AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class); - when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod(); + when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod(); + + // The amount of mocking in this test makes it necessary to make the + // getCheckpointStreamFactory method visible for the test and to + // overwrite its behaviour. + when(operator.getCheckpointStreamFactory(any(CheckpointOptions.class))).thenReturn(streamFactory); + doReturn(containingTask).when(operator).getContainingTask(); RunnableFuture<OperatorStateHandle> futureManagedOperatorStateHandle = mock(RunnableFuture.class); OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class); - when(operatorStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenReturn(futureManagedOperatorStateHandle); + when(operatorStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory), any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle); AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class); - when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenThrow(failingException); + when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory), eq(CheckpointOptions.forFullCheckpoint()))).thenThrow(failingException); Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend); Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend); Whitebox.setInternalState(operator, "checkpointStreamFactory", streamFactory); try { - operator.snapshotState(checkpointId, timestamp); + operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint()); fail("Exception expected."); } catch (Exception e) { assertEquals(failingException, e.getCause()); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java index c4ddea8..d331171 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -90,8 +91,8 @@ public class AbstractUdfStreamOperatorLifecycleTest { "setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class " + "org.apache.flink.streaming.api.graph.StreamConfig, interface " + "org.apache.flink.streaming.api.operators.Output], " + - "snapshotLegacyOperatorState[long, long], " + - "snapshotState[long, long]]"; + "snapshotLegacyOperatorState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions], " + + "snapshotState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions]]"; private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[]" + ", open[class org.apache.flink.configuration.Configuration], setRuntimeContext[interface " + @@ -240,7 +241,8 @@ public class AbstractUdfStreamOperatorLifecycleTest { try { runStarted.await(); if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint( - new CheckpointMetaData(0, System.currentTimeMillis()))) { + new CheckpointMetaData(0, System.currentTimeMillis()), + CheckpointOptions.forFullCheckpoint())) { LifecycleTrackingStreamSource.runFinish.trigger(); } } catch (Exception e) { @@ -260,9 +262,9 @@ public class AbstractUdfStreamOperatorLifecycleTest { } @Override - public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception { + public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception { ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotLegacyOperatorState"); - return super.snapshotLegacyOperatorState(checkpointId, timestamp); + return super.snapshotLegacyOperatorState(checkpointId, timestamp, checkpointOptions); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java index b1689f9..ab4258f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java @@ -184,4 +184,4 @@ public class WrappingFunctionSnapshotRestoreTest { return value; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 907f8f1..c4867ff 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -522,7 +523,7 @@ public class AsyncWaitOperatorTest extends TestLogger { final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp); - task.triggerCheckpoint(checkpointMetaData); + task.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint()); env.getCheckpointLatch().await(); @@ -557,7 +558,7 @@ public class AsyncWaitOperatorTest extends TestLogger { restoredTaskHarness.processElement(new StreamRecord<>(7, initialTime + 7)); // trigger the checkpoint while processing stream elements - restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp)); + restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp), CheckpointOptions.forFullCheckpoint()); restoredTaskHarness.processElement(new StreamRecord<>(8, initialTime + 8));
