This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.7 by this push: new 4597daf [FLINK-10764][tests] Add ITCase for checkpoint path entropy injection. (#7075) 4597daf is described below commit 4597daf881f24547f3866db7a9a20a606065115b Author: Gary Yao <g...@data-artisans.com> AuthorDate: Thu Nov 15 11:59:24 2018 +0100 [FLINK-10764][tests] Add ITCase for checkpoint path entropy injection. (#7075) Add a test that verifies that checkpoint data on the file system has additional entropy added to its path. Remove code duplication in SavepointITCase. --- .../testutils/EntropyInjectingTestFileSystem.java | 65 +++++++ .../org.apache.flink.core.fs.FileSystemFactory | 3 +- .../flink/test/checkpointing/SavepointITCase.java | 200 ++++++++++++++------- 3 files changed, 205 insertions(+), 63 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java b/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java new file mode 100644 index 0000000..ef1575b --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.testutils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.EntropyInjectingFileSystem; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.core.fs.local.LocalFileSystem; + +import java.net.URI; + +/** + * A test file system that implements {@link EntropyInjectingFileSystem}. + */ +public class EntropyInjectingTestFileSystem extends LocalFileSystem implements EntropyInjectingFileSystem { + + public static final String ENTROPY_INJECTION_KEY = "_entropy_"; + + public static final String ENTROPY = "_resolved_"; + + @Override + public String getEntropyInjectionKey() { + return ENTROPY_INJECTION_KEY; + } + + @Override + public String generateEntropy() { + return ENTROPY; + } + + public static class EntropyInjectingTestFileSystemFactory implements FileSystemFactory { + + @Override + public String getScheme() { + return "test-entropy"; + } + + @Override + public void configure(final Configuration config) { + } + + @Override + public FileSystem create(final URI fsUri) { + return new EntropyInjectingTestFileSystem(); + } + } +} diff --git a/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory index 5a3a31d..76965cb 100644 --- a/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory +++ b/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.testutils.TestFileSystem$TestFileSystemFactory \ No newline at end of file +org.apache.flink.testutils.TestFileSystem$TestFileSystemFactory +org.apache.flink.testutils.EntropyInjectingTestFileSystem$EntropyInjectingTestFileSystemFactory diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 4f60769..9140570 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -53,11 +53,16 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.testutils.EntropyInjectingTestFileSystem; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -66,7 +71,12 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.FileVisitOption; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -76,9 +86,12 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -93,6 +106,22 @@ public class SavepointITCase extends TestLogger { @Rule public final TemporaryFolder folder = new TemporaryFolder(); + private File checkpointDir; + + private File savepointDir; + + @Before + public void setUp() throws Exception { + final File testRoot = folder.newFolder(); + + checkpointDir = new File(testRoot, "checkpoints"); + savepointDir = new File(testRoot, "savepoints"); + + if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) { + fail("Test setup failed: failed to create temporary directories."); + } + } + /** * Triggers a savepoint for a job that uses the FsStateBackend. We expect * that all checkpoint files are written to a new savepoint directory. @@ -109,35 +138,46 @@ public class SavepointITCase extends TestLogger { */ @Test public void testTriggerSavepointAndResumeWithFileBasedCheckpoints() throws Exception { - // Config final int numTaskManagers = 2; final int numSlotsPerTaskManager = 2; final int parallelism = numTaskManagers * numSlotsPerTaskManager; - final File testRoot = folder.newFolder(); - Configuration config = new Configuration(); + final MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory( + numTaskManagers, + numSlotsPerTaskManager, + getFileBasedCheckpointsConfig()); - final File checkpointDir = new File(testRoot, "checkpoints"); - final File savepointRootDir = new File(testRoot, "savepoints"); + final String savepointPath = submitJobAndTakeSavepoint(clusterFactory, parallelism); + verifySavepoint(parallelism, savepointPath); - if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) { - fail("Test setup failed: failed to create temporary directories."); - } + restoreJobAndVerifyState(savepointPath, clusterFactory, parallelism); + } - // Use file based checkpoints - config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem"); - config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); - config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0); - config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointRootDir.toURI().toString()); + @Test + public void testShouldAddEntropyToSavepointPath() throws Exception { + final int numTaskManagers = 2; + final int numSlotsPerTaskManager = 2; + final int parallelism = numTaskManagers * numSlotsPerTaskManager; - MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(numTaskManagers, numSlotsPerTaskManager, config); + final MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory( + numTaskManagers, + numSlotsPerTaskManager, + getCheckpointingWithEntropyConfig()); - String savepointPath = submitJobAndGetVerifiedSavepoint(clusterFactory, parallelism); + final String savepointPath = submitJobAndTakeSavepoint(clusterFactory, parallelism); + assertThat(savepointDir, hasEntropyInFileStateHandlePaths()); restoreJobAndVerifyState(savepointPath, clusterFactory, parallelism); } - private String submitJobAndGetVerifiedSavepoint(MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception { + private Configuration getCheckpointingWithEntropyConfig() { + final String savepointPathWithEntropyPlaceholder = new File(savepointDir, EntropyInjectingTestFileSystem.ENTROPY_INJECTION_KEY).getPath(); + final Configuration config = getFileBasedCheckpointsConfig("test-entropy://" + savepointPathWithEntropyPlaceholder); + config.setString("s3.entropy.key", EntropyInjectingTestFileSystem.ENTROPY_INJECTION_KEY); + return config; + } + + private String submitJobAndTakeSavepoint(MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception { final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000); final JobID jobId = jobGraph.getJobID(); StatefulCounter.resetForTest(parallelism); @@ -152,32 +192,32 @@ public class SavepointITCase extends TestLogger { StatefulCounter.getProgressLatch().await(); - String savepointPath = client.triggerSavepoint(jobId, null).get(); - - // Only one savepoint should exist - File savepointDir = new File(new URI(savepointPath)); - assertTrue("Savepoint directory does not exist.", savepointDir.exists()); - assertTrue("Savepoint did not create self-contained directory.", savepointDir.isDirectory()); - - File[] savepointFiles = savepointDir.listFiles(); - - if (savepointFiles != null) { - // Expect one metadata file and one checkpoint file per stateful - // parallel subtask - String errMsg = "Did not write expected number of savepoint/checkpoint files to directory: " - + Arrays.toString(savepointFiles); - assertEquals(errMsg, 1 + parallelism, savepointFiles.length); - } else { - fail(String.format("Returned savepoint path (%s) is not valid.", savepointPath)); - } - - return savepointPath; + return client.triggerSavepoint(jobId, null).get(); } finally { cluster.after(); StatefulCounter.resetForTest(parallelism); } } + private void verifySavepoint(final int parallelism, final String savepointPath) throws URISyntaxException { + // Only one savepoint should exist + File savepointDir = new File(new URI(savepointPath)); + assertTrue("Savepoint directory does not exist.", savepointDir.exists()); + assertTrue("Savepoint did not create self-contained directory.", savepointDir.isDirectory()); + + File[] savepointFiles = savepointDir.listFiles(); + + if (savepointFiles != null) { + // Expect one metadata file and one checkpoint file per stateful + // parallel subtask + String errMsg = "Did not write expected number of savepoint/checkpoint files to directory: " + + Arrays.toString(savepointFiles); + assertEquals(errMsg, 1 + parallelism, savepointFiles.length); + } else { + fail(String.format("Returned savepoint path (%s) is not valid.", savepointPath)); + } + } + private void restoreJobAndVerifyState(String savepointPath, MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception { final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000); jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); @@ -224,9 +264,6 @@ public class SavepointITCase extends TestLogger { final int numTaskManagers = 1; final int numSlotsPerTaskManager = 1; - final File tmpDir = folder.newFolder(); - final File savepointDir = new File(tmpDir, "savepoints"); - final Configuration config = new Configuration(); config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); @@ -299,9 +336,6 @@ public class SavepointITCase extends TestLogger { int numSlotsPerTaskManager = 1; int parallelism = numTaskManagers * numSlotsPerTaskManager; - final File tmpDir = folder.newFolder(); - final File savepointDir = new File(tmpDir, "savepoints"); - final Configuration config = new Configuration(); config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); @@ -361,9 +395,6 @@ public class SavepointITCase extends TestLogger { // Test deadline final Deadline deadline = Deadline.now().plus(Duration.ofMinutes(5)); - final File tmpDir = folder.newFolder(); - final File savepointDir = new File(tmpDir, "savepoints"); - // Flink configuration final Configuration config = new Configuration(); config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); @@ -590,11 +621,6 @@ public class SavepointITCase extends TestLogger { iterTestCheckpointVerify[i] = 0; } - TemporaryFolder folder = new TemporaryFolder(); - folder.create(); - // Temporary directory for file state backend - final File tmpDir = folder.newFolder(); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final IntegerStreamSource source = new IntegerStreamSource(); IterativeStream<Integer> iteration = env.addSource(source) @@ -638,20 +664,9 @@ public class SavepointITCase extends TestLogger { JobGraph jobGraph = streamGraph.getJobGraph(); - Configuration config = new Configuration(); + Configuration config = getFileBasedCheckpointsConfig(); config.addAll(jobGraph.getJobConfiguration()); config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0"); - final File checkpointDir = new File(tmpDir, "checkpoints"); - final File savepointDir = new File(tmpDir, "savepoints"); - - if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) { - fail("Test setup failed: failed to create temporary directories."); - } - - config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem"); - config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); - config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0); - config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); MiniClusterWithClientResource cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() @@ -798,4 +813,65 @@ public class SavepointITCase extends TestLogger { .build()); } } + + private Configuration getFileBasedCheckpointsConfig(final String savepointDir) { + final Configuration config = new Configuration(); + config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem"); + config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); + config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0); + config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); + return config; + } + + private Configuration getFileBasedCheckpointsConfig() { + return getFileBasedCheckpointsConfig(savepointDir.toURI().toString()); + } + + private static Matcher<File> hasEntropyInFileStateHandlePaths() { + return new TypeSafeDiagnosingMatcher<File>() { + + @Override + protected boolean matchesSafely(final File savepointDir, final Description mismatchDescription) { + if (savepointDir == null) { + mismatchDescription.appendText("savepoint dir must not be null"); + return false; + } + + final List<Path> filesWithoutEntropy = listRecursively(savepointDir.toPath().resolve(EntropyInjectingTestFileSystem.ENTROPY_INJECTION_KEY)); + final Path savepointDirWithEntropy = savepointDir.toPath().resolve(EntropyInjectingTestFileSystem.ENTROPY); + final List<Path> filesWithEntropy = listRecursively(savepointDirWithEntropy); + + if (!filesWithoutEntropy.isEmpty()) { + mismatchDescription.appendText("there are savepoint files with unresolved entropy placeholders"); + return false; + } + + if (!Files.exists(savepointDirWithEntropy) || filesWithEntropy.isEmpty()) { + mismatchDescription.appendText("there are no savepoint files with added entropy"); + return false; + } + + return true; + } + + @Override + public void describeTo(final Description description) { + description.appendText("all savepoint files should have added entropy"); + } + }; + } + + private static List<Path> listRecursively(final Path dir) { + try { + if (!Files.exists(dir)) { + return Collections.emptyList(); + } else { + try (Stream<Path> files = Files.walk(dir, FileVisitOption.FOLLOW_LINKS)) { + return files.filter(Files::isRegularFile).collect(Collectors.toList()); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } }