[FLINK-5928] [checkpoints] Add CheckpointCoordinatorExternalizedCheckpointsTest

Problem: there were only unit tests for the checkpoint instances available
that don't test the behaviour of the checkpoint coordinator with respect
to externalized checkpoints.

This closes #3424


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c477d87c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c477d87c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c477d87c

Branch: refs/heads/master
Commit: c477d87c68f2da4340c8d469e1b4331e6a660ef0
Parents: 3446e66
Author: Ufuk Celebi <[email protected]>
Authored: Mon Feb 27 16:12:37 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Feb 28 19:02:13 2017 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/PendingCheckpoint.java   |  24 ++-
 .../checkpoint/savepoint/SavepointStore.java    |  47 ++++-
 ...tCoordinatorExternalizedCheckpointsTest.java | 197 +++++++++++++++++++
 .../checkpoint/CheckpointCoordinatorTest.java   |   2 +-
 .../savepoint/SavepointStoreTest.java           |  23 ++-
 5 files changed, 282 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 2c392b8..6c9dbaf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -210,6 +210,7 @@ public class PendingCheckpoint {
        }
 
        public CompletedCheckpoint finalizeCheckpointExternalized() throws 
IOException {
+
                synchronized (lock) {
                        checkState(isFullyAcknowledged(), "Pending checkpoint 
has not been fully acknowledged yet.");
 
@@ -222,10 +223,27 @@ public class PendingCheckpoint {
                                //            but the checkpoints think more 
generic. we need to work with file handles
                                //            here until the savepoint 
serializer accepts a generic stream factory
 
-                               final FileStateHandle metadataHandle = 
SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
-                               final String externalPointer = 
metadataHandle.getFilePath().getParent().toString();
+                               // We have this branch here, because savepoints 
and externalized checkpoints
+                               // currently behave differently.
+                               // Savepoints:
+                               //   - Metadata file in unique directory
+                               //   - External pointer can be the directory
+                               // Externalized checkpoints:
+                               //   - Multiple metadata files per directory 
possible (need to be unique)
+                               //   - External pointer needs to be the file 
itself
+                               //
+                               // This should be unified as part of the 
JobManager metadata stream factories.
+                               if (props.isSavepoint()) {
+                                       final FileStateHandle metadataHandle = 
SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
+                                       final String externalPointer = 
metadataHandle.getFilePath().getParent().toString();
+       
+                                       return finalizeInternal(metadataHandle, 
externalPointer);
+                               } else {
+                                       final FileStateHandle metadataHandle = 
SavepointStore.storeExternalizedCheckpointToHandle(targetDirectory, savepoint);
+                                       final String externalPointer = 
metadataHandle.getFilePath().toString();
 
-                               return finalizeInternal(metadataHandle, 
externalPointer);
+                                       return finalizeInternal(metadataHandle, 
externalPointer);
+                               }
                        }
                        catch (Throwable t) {
                                onCompletionPromise.completeExceptionally(t);

http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 5c8ac6b..7beb1b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -60,7 +60,13 @@ public class SavepointStore {
        /** Magic number for sanity checks against stored savepoints. */
        public static final int MAGIC_NUMBER = 0x4960672d;
 
-       private static final String META_DATA_FILE = "_metadata ";
+       private static final String SAVEPOINT_METADATA_FILE = "_metadata";
+
+       /**
+        * Metadata file for an externalized checkpoint, random suffix added
+        * during store, because the parent directory is not unique.
+        */
+       static final String EXTERNALIZED_CHECKPOINT_METADATA_FILE = 
"checkpoint_metadata-";
 
        /**
         * Creates a savepoint directory.
@@ -122,7 +128,8 @@ public class SavepointStore {
         */
        public static <T extends Savepoint> String storeSavepoint(String 
directory, T savepoint) throws IOException {
                // write and create the file handle
-               FileStateHandle metadataFileHandle = 
storeSavepointToHandle(directory, savepoint);
+               FileStateHandle metadataFileHandle = 
storeSavepointToHandle(directory,
+                       SAVEPOINT_METADATA_FILE, savepoint);
 
                // we return the savepoint directory path here!
                // The directory path also works to resume from and is more 
elegant than the direct
@@ -135,19 +142,47 @@ public class SavepointStore {
         *
         * @param directory Target directory to store savepoint in
         * @param savepoint Savepoint to be stored
-        *                     
+        *
+        * @return State handle to the checkpoint metadata
+        * @throws IOException Failures during store are forwarded
+        */
+       public static <T extends Savepoint> FileStateHandle 
storeSavepointToHandle(String directory, T savepoint) throws IOException {
+               return storeSavepointToHandle(directory, 
SAVEPOINT_METADATA_FILE, savepoint);
+       }
+
+       /**
+        * Stores the externalized checkpoint metadata file to a state handle.
+        *
+        * @param directory Target directory to store savepoint in
+        * @param savepoint Savepoint to be stored
+        *
+        * @return State handle to the checkpoint metadata
+        * @throws IOException Failures during store are forwarded
+        */
+       public static <T extends Savepoint> FileStateHandle 
storeExternalizedCheckpointToHandle(String directory, T savepoint) throws 
IOException {
+               String fileName = 
FileUtils.getRandomFilename(EXTERNALIZED_CHECKPOINT_METADATA_FILE);
+               return storeSavepointToHandle(directory, fileName, savepoint);
+       }
+
+       /**
+        * Stores the savepoint metadata file to a state handle.
+        *
+        * @param directory Target directory to store savepoint in
+        * @param savepoint Savepoint to be stored
+        *
         * @return State handle to the checkpoint metadata
         * @throws IOException Failures during store are forwarded
         */
-       public static <T extends Savepoint> FileStateHandle 
storeSavepointToHandle(
+       static <T extends Savepoint> FileStateHandle storeSavepointToHandle(
                        String directory,
+                       String filename,
                        T savepoint) throws IOException {
 
                checkNotNull(directory, "Target directory");
                checkNotNull(savepoint, "Savepoint");
 
                final Path basePath = new Path(directory);
-               final Path metadataFilePath = new Path(basePath, 
META_DATA_FILE);
+               final Path metadataFilePath = new Path(basePath, filename);
 
                final FileSystem fs = FileSystem.get(basePath.toUri());
 
@@ -219,7 +254,7 @@ public class SavepointStore {
 
                // If this is a directory, we need to find the meta data file
                if (status.isDir()) {
-                       Path candidatePath = new Path(path, META_DATA_FILE);
+                       Path candidatePath = new Path(path, 
SAVEPOINT_METADATA_FILE);
                        if (fs.exists(candidatePath)) {
                                path = candidatePath;
                                LOG.info("Using savepoint file in {}", path);

http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
new file mode 100644
index 0000000..9f94f2f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * CheckpointCoordinator tests for externalized checkpoints.
+ *
+ * <p>This is separate from {@link CheckpointCoordinatorTest}, because that
+ * test is already huge and covers many different configurations.
+ */
+public class CheckpointCoordinatorExternalizedCheckpointsTest {
+
+       @Rule
+       public TemporaryFolder tmp = new TemporaryFolder();
+
+       /**
+        * Triggers multiple externalized checkpoints and verifies that the 
metadata
+        * files have been created.
+        */
+       @Test
+       public void testTriggerAndConfirmSimpleExternalizedCheckpoint()
+               throws Exception {
+               final JobID jid = new JobID();
+
+               final ExternalizedCheckpointSettings 
externalizedCheckpointSettings =
+                       
ExternalizedCheckpointSettings.externalizeCheckpoints(false);
+
+               final File checkpointDir = tmp.newFolder();
+
+               // create some mock Execution vertices that receive the 
checkpoint trigger messages
+               final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+               final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+               ExecutionVertex vertex1 = 
CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
+               ExecutionVertex vertex2 = 
CheckpointCoordinatorTest.mockExecutionVertex(attemptID2);
+
+               Map<JobVertexID, ExecutionJobVertex> jobVertices = new 
HashMap<>();
+               jobVertices.put(vertex1.getJobvertexId(), 
vertex1.getJobVertex());
+               jobVertices.put(vertex2.getJobvertexId(), 
vertex2.getJobVertex());
+
+               // set up the coordinator and validate the initial state
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jid,
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       externalizedCheckpointSettings,
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1),
+                       checkpointDir.getAbsolutePath(),
+                       Executors.directExecutor());
+
+               assertEquals(0, coord.getNumberOfPendingCheckpoints());
+               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+               // ---------------
+               // trigger checkpoint 1
+               // ---------------
+
+               {
+                       final long timestamp1 = System.currentTimeMillis();
+
+                       coord.triggerCheckpoint(timestamp1, false);
+
+                       long checkpointId1 = 
coord.getPendingCheckpoints().entrySet().iterator().next()
+                               .getKey();
+
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID1, checkpointId1));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId1));
+
+                       CompletedCheckpoint latest = 
coord.getCheckpointStore().getLatestCheckpoint();
+
+                       verifyExternalizedCheckpoint(latest, jid, 
checkpointId1, timestamp1);
+                       verifyExternalizedCheckpointRestore(latest, 
jobVertices, vertex1, vertex2);
+               }
+
+               // ---------------
+               // trigger checkpoint 2
+               // ---------------
+
+               {
+                       final long timestamp2 = System.currentTimeMillis() + 7;
+                       coord.triggerCheckpoint(timestamp2, false);
+
+                       long checkpointId2 = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID1, checkpointId2));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId2));
+
+                       CompletedCheckpoint latest = 
coord.getCheckpointStore().getLatestCheckpoint();
+                       verifyExternalizedCheckpoint(latest, jid, 
checkpointId2, timestamp2);
+                       verifyExternalizedCheckpointRestore(latest, 
jobVertices, vertex1, vertex2);
+               }
+
+               // ---------------
+               // trigger checkpoint 3
+               // ---------------
+
+               {
+                       final long timestamp3 = System.currentTimeMillis() + 
146;
+                       coord.triggerCheckpoint(timestamp3, false);
+
+                       long checkpointId3 = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID1, checkpointId3));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId3));
+
+                       CompletedCheckpoint latest = 
coord.getCheckpointStore().getLatestCheckpoint();
+                       verifyExternalizedCheckpoint(latest, jid, 
checkpointId3, timestamp3);
+                       verifyExternalizedCheckpointRestore(latest, 
jobVertices, vertex1, vertex2);
+               }
+
+               coord.shutdown(JobStatus.FINISHED);
+       }
+
+       /**
+        * Verifies an externalized completed checkpoint instance.
+        *
+        * <p>The provided JobID, checkpoint ID, timestamp need to match. Also, 
the
+        * external pointer and external metadata need to be notNull and exist 
(currently
+        * assuming that they are file system based).
+        *
+        * @param checkpoint Completed checkpoint to check.
+        * @param jid JobID of the job the checkpoint belongs to.
+        * @param checkpointId Checkpoint ID of the checkpoint to check.
+        * @param timestamp Timestamp of the checkpoint to check.
+        */
+       private static void verifyExternalizedCheckpoint(CompletedCheckpoint 
checkpoint, JobID jid, long checkpointId, long timestamp) {
+               assertEquals(jid, checkpoint.getJobId());
+               assertEquals(checkpointId, checkpoint.getCheckpointID());
+               assertEquals(timestamp, checkpoint.getTimestamp());
+               assertNotNull(checkpoint.getExternalPointer());
+               assertNotNull(checkpoint.getExternalizedMetadata());
+               FileStateHandle fsHandle = (FileStateHandle) 
checkpoint.getExternalizedMetadata();
+               assertTrue(new File(fsHandle.getFilePath().getPath()).exists());
+       }
+
+       private static void verifyExternalizedCheckpointRestore(
+                       CompletedCheckpoint checkpoint,
+                       Map<JobVertexID, ExecutionJobVertex> jobVertices,
+                       ExecutionVertex... vertices) throws IOException {
+
+               CompletedCheckpoint loaded = 
SavepointLoader.loadAndValidateSavepoint(
+                               checkpoint.getJobId(),
+                               jobVertices,
+                               checkpoint.getExternalPointer(),
+                               Thread.currentThread().getContextClassLoader(),
+                               false);
+
+               for (ExecutionVertex vertex : vertices) {
+                       
assertEquals(checkpoint.getTaskState(vertex.getJobvertexId()), 
loaded.getTaskState(vertex.getJobvertexId()));
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index d8e46fa..1691370 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2586,7 +2586,7 @@ public class CheckpointCoordinatorTest {
                return ChainedStateHandle.wrapSingleHandle(operatorStateHandle);
        }
 
-       private static ExecutionJobVertex mockExecutionJobVertex(
+       static ExecutionJobVertex mockExecutionJobVertex(
                JobVertexID jobVertexID,
                int parallelism,
                int maxParallelism) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
index dc19e47..1eb8055 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import java.io.File;
-import java.util.Arrays;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -189,6 +189,27 @@ public class SavepointStoreTest {
                assertEquals("Savepoint file not cleaned up on failure", 0, 
tmp.getRoot().listFiles().length);
        }
 
+       /**
+        * Tests that multiple externalized checkpoints can be stored to the 
same
+        * directory.
+        */
+       @Test
+       public void testStoreExternalizedCheckpointsToSameDirectory() throws 
Exception {
+               String root = tmp.newFolder().getAbsolutePath();
+               FileSystem fs = FileSystem.get(new Path(root).toUri());
+
+               // Store
+               SavepointV1 savepoint = new SavepointV1(1929292, 
SavepointV1Test.createTaskStates(4, 24));
+
+               FileStateHandle store1 = 
SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint);
+               fs.exists(store1.getFilePath());
+               
assertTrue(store1.getFilePath().getPath().contains(SavepointStore.EXTERNALIZED_CHECKPOINT_METADATA_FILE));
+
+               FileStateHandle store2 = 
SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint);
+               fs.exists(store2.getFilePath());
+               
assertTrue(store2.getFilePath().getPath().contains(SavepointStore.EXTERNALIZED_CHECKPOINT_METADATA_FILE));
+       }
+
        private static class NewSavepointSerializer implements 
SavepointSerializer<TestSavepoint> {
 
                private static final NewSavepointSerializer INSTANCE = new 
NewSavepointSerializer();

Reply via email to