[FLINK-5928] [checkpoints] Add CheckpointCoordinatorExternalizedCheckpointsTest
Problem: there were only unit tests for the checkpoint instances available that don't test the behaviour of the checkpoint coordinator with respect to externalized checkpoints. This closes #3424 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c477d87c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c477d87c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c477d87c Branch: refs/heads/master Commit: c477d87c68f2da4340c8d469e1b4331e6a660ef0 Parents: 3446e66 Author: Ufuk Celebi <[email protected]> Authored: Mon Feb 27 16:12:37 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Tue Feb 28 19:02:13 2017 +0100 ---------------------------------------------------------------------- .../runtime/checkpoint/PendingCheckpoint.java | 24 ++- .../checkpoint/savepoint/SavepointStore.java | 47 ++++- ...tCoordinatorExternalizedCheckpointsTest.java | 197 +++++++++++++++++++ .../checkpoint/CheckpointCoordinatorTest.java | 2 +- .../savepoint/SavepointStoreTest.java | 23 ++- 5 files changed, 282 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 2c392b8..6c9dbaf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -210,6 +210,7 @@ public class PendingCheckpoint { } public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException { + synchronized (lock) { checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); @@ -222,10 +223,27 @@ public class PendingCheckpoint { // but the checkpoints think more generic. we need to work with file handles // here until the savepoint serializer accepts a generic stream factory - final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint); - final String externalPointer = metadataHandle.getFilePath().getParent().toString(); + // We have this branch here, because savepoints and externalized checkpoints + // currently behave differently. + // Savepoints: + // - Metadata file in unique directory + // - External pointer can be the directory + // Externalized checkpoints: + // - Multiple metadata files per directory possible (need to be unique) + // - External pointer needs to be the file itself + // + // This should be unified as part of the JobManager metadata stream factories. + if (props.isSavepoint()) { + final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint); + final String externalPointer = metadataHandle.getFilePath().getParent().toString(); + + return finalizeInternal(metadataHandle, externalPointer); + } else { + final FileStateHandle metadataHandle = SavepointStore.storeExternalizedCheckpointToHandle(targetDirectory, savepoint); + final String externalPointer = metadataHandle.getFilePath().toString(); - return finalizeInternal(metadataHandle, externalPointer); + return finalizeInternal(metadataHandle, externalPointer); + } } catch (Throwable t) { onCompletionPromise.completeExceptionally(t); http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java index 5c8ac6b..7beb1b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java @@ -60,7 +60,13 @@ public class SavepointStore { /** Magic number for sanity checks against stored savepoints. */ public static final int MAGIC_NUMBER = 0x4960672d; - private static final String META_DATA_FILE = "_metadata "; + private static final String SAVEPOINT_METADATA_FILE = "_metadata"; + + /** + * Metadata file for an externalized checkpoint, random suffix added + * during store, because the parent directory is not unique. + */ + static final String EXTERNALIZED_CHECKPOINT_METADATA_FILE = "checkpoint_metadata-"; /** * Creates a savepoint directory. @@ -122,7 +128,8 @@ public class SavepointStore { */ public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException { // write and create the file handle - FileStateHandle metadataFileHandle = storeSavepointToHandle(directory, savepoint); + FileStateHandle metadataFileHandle = storeSavepointToHandle(directory, + SAVEPOINT_METADATA_FILE, savepoint); // we return the savepoint directory path here! // The directory path also works to resume from and is more elegant than the direct @@ -135,19 +142,47 @@ public class SavepointStore { * * @param directory Target directory to store savepoint in * @param savepoint Savepoint to be stored - * + * + * @return State handle to the checkpoint metadata + * @throws IOException Failures during store are forwarded + */ + public static <T extends Savepoint> FileStateHandle storeSavepointToHandle(String directory, T savepoint) throws IOException { + return storeSavepointToHandle(directory, SAVEPOINT_METADATA_FILE, savepoint); + } + + /** + * Stores the externalized checkpoint metadata file to a state handle. + * + * @param directory Target directory to store savepoint in + * @param savepoint Savepoint to be stored + * + * @return State handle to the checkpoint metadata + * @throws IOException Failures during store are forwarded + */ + public static <T extends Savepoint> FileStateHandle storeExternalizedCheckpointToHandle(String directory, T savepoint) throws IOException { + String fileName = FileUtils.getRandomFilename(EXTERNALIZED_CHECKPOINT_METADATA_FILE); + return storeSavepointToHandle(directory, fileName, savepoint); + } + + /** + * Stores the savepoint metadata file to a state handle. + * + * @param directory Target directory to store savepoint in + * @param savepoint Savepoint to be stored + * * @return State handle to the checkpoint metadata * @throws IOException Failures during store are forwarded */ - public static <T extends Savepoint> FileStateHandle storeSavepointToHandle( + static <T extends Savepoint> FileStateHandle storeSavepointToHandle( String directory, + String filename, T savepoint) throws IOException { checkNotNull(directory, "Target directory"); checkNotNull(savepoint, "Savepoint"); final Path basePath = new Path(directory); - final Path metadataFilePath = new Path(basePath, META_DATA_FILE); + final Path metadataFilePath = new Path(basePath, filename); final FileSystem fs = FileSystem.get(basePath.toUri()); @@ -219,7 +254,7 @@ public class SavepointStore { // If this is a directory, we need to find the meta data file if (status.isDir()) { - Path candidatePath = new Path(path, META_DATA_FILE); + Path candidatePath = new Path(path, SAVEPOINT_METADATA_FILE); if (fs.exists(candidatePath)) { path = candidatePath; LOG.info("Using savepoint file in {}", path); http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java new file mode 100644 index 0000000..9f94f2f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * CheckpointCoordinator tests for externalized checkpoints. + * + * <p>This is separate from {@link CheckpointCoordinatorTest}, because that + * test is already huge and covers many different configurations. + */ +public class CheckpointCoordinatorExternalizedCheckpointsTest { + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + /** + * Triggers multiple externalized checkpoints and verifies that the metadata + * files have been created. + */ + @Test + public void testTriggerAndConfirmSimpleExternalizedCheckpoint() + throws Exception { + final JobID jid = new JobID(); + + final ExternalizedCheckpointSettings externalizedCheckpointSettings = + ExternalizedCheckpointSettings.externalizeCheckpoints(false); + + final File checkpointDir = tmp.newFolder(); + + // create some mock Execution vertices that receive the checkpoint trigger messages + final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); + final ExecutionAttemptID attemptID2 = new ExecutionAttemptID(); + ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1); + ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2); + + Map<JobVertexID, ExecutionJobVertex> jobVertices = new HashMap<>(); + jobVertices.put(vertex1.getJobvertexId(), vertex1.getJobVertex()); + jobVertices.put(vertex2.getJobvertexId(), vertex2.getJobVertex()); + + // set up the coordinator and validate the initial state + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + externalizedCheckpointSettings, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + checkpointDir.getAbsolutePath(), + Executors.directExecutor()); + + assertEquals(0, coord.getNumberOfPendingCheckpoints()); + assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + + // --------------- + // trigger checkpoint 1 + // --------------- + + { + final long timestamp1 = System.currentTimeMillis(); + + coord.triggerCheckpoint(timestamp1, false); + + long checkpointId1 = coord.getPendingCheckpoints().entrySet().iterator().next() + .getKey(); + + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId1)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId1)); + + CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint(); + + verifyExternalizedCheckpoint(latest, jid, checkpointId1, timestamp1); + verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2); + } + + // --------------- + // trigger checkpoint 2 + // --------------- + + { + final long timestamp2 = System.currentTimeMillis() + 7; + coord.triggerCheckpoint(timestamp2, false); + + long checkpointId2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); + + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2)); + + CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint(); + verifyExternalizedCheckpoint(latest, jid, checkpointId2, timestamp2); + verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2); + } + + // --------------- + // trigger checkpoint 3 + // --------------- + + { + final long timestamp3 = System.currentTimeMillis() + 146; + coord.triggerCheckpoint(timestamp3, false); + + long checkpointId3 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); + + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId3)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId3)); + + CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint(); + verifyExternalizedCheckpoint(latest, jid, checkpointId3, timestamp3); + verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2); + } + + coord.shutdown(JobStatus.FINISHED); + } + + /** + * Verifies an externalized completed checkpoint instance. + * + * <p>The provided JobID, checkpoint ID, timestamp need to match. Also, the + * external pointer and external metadata need to be notNull and exist (currently + * assuming that they are file system based). + * + * @param checkpoint Completed checkpoint to check. + * @param jid JobID of the job the checkpoint belongs to. + * @param checkpointId Checkpoint ID of the checkpoint to check. + * @param timestamp Timestamp of the checkpoint to check. + */ + private static void verifyExternalizedCheckpoint(CompletedCheckpoint checkpoint, JobID jid, long checkpointId, long timestamp) { + assertEquals(jid, checkpoint.getJobId()); + assertEquals(checkpointId, checkpoint.getCheckpointID()); + assertEquals(timestamp, checkpoint.getTimestamp()); + assertNotNull(checkpoint.getExternalPointer()); + assertNotNull(checkpoint.getExternalizedMetadata()); + FileStateHandle fsHandle = (FileStateHandle) checkpoint.getExternalizedMetadata(); + assertTrue(new File(fsHandle.getFilePath().getPath()).exists()); + } + + private static void verifyExternalizedCheckpointRestore( + CompletedCheckpoint checkpoint, + Map<JobVertexID, ExecutionJobVertex> jobVertices, + ExecutionVertex... vertices) throws IOException { + + CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint( + checkpoint.getJobId(), + jobVertices, + checkpoint.getExternalPointer(), + Thread.currentThread().getContextClassLoader(), + false); + + for (ExecutionVertex vertex : vertices) { + assertEquals(checkpoint.getTaskState(vertex.getJobvertexId()), loaded.getTaskState(vertex.getJobvertexId())); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index d8e46fa..1691370 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -2586,7 +2586,7 @@ public class CheckpointCoordinatorTest { return ChainedStateHandle.wrapSingleHandle(operatorStateHandle); } - private static ExecutionJobVertex mockExecutionJobVertex( + static ExecutionJobVertex mockExecutionJobVertex( JobVertexID jobVertexID, int parallelism, int maxParallelism) { http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java index dc19e47..1eb8055 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.checkpoint.savepoint; import java.io.File; -import java.util.Arrays; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -189,6 +189,27 @@ public class SavepointStoreTest { assertEquals("Savepoint file not cleaned up on failure", 0, tmp.getRoot().listFiles().length); } + /** + * Tests that multiple externalized checkpoints can be stored to the same + * directory. + */ + @Test + public void testStoreExternalizedCheckpointsToSameDirectory() throws Exception { + String root = tmp.newFolder().getAbsolutePath(); + FileSystem fs = FileSystem.get(new Path(root).toUri()); + + // Store + SavepointV1 savepoint = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24)); + + FileStateHandle store1 = SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint); + fs.exists(store1.getFilePath()); + assertTrue(store1.getFilePath().getPath().contains(SavepointStore.EXTERNALIZED_CHECKPOINT_METADATA_FILE)); + + FileStateHandle store2 = SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint); + fs.exists(store2.getFilePath()); + assertTrue(store2.getFilePath().getPath().contains(SavepointStore.EXTERNALIZED_CHECKPOINT_METADATA_FILE)); + } + private static class NewSavepointSerializer implements SavepointSerializer<TestSavepoint> { private static final NewSavepointSerializer INSTANCE = new NewSavepointSerializer();
