This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ddb02dd  [FLINK-10764][tests] Add ITCase for checkpoint path entropy 
injection. (#7075)
ddb02dd is described below

commit ddb02ddd44a6ded643b31df2711e9f615d0e8087
Author: Gary Yao <g...@data-artisans.com>
AuthorDate: Thu Nov 15 11:59:24 2018 +0100

    [FLINK-10764][tests] Add ITCase for checkpoint path entropy injection. 
(#7075)
    
    Add a test that verifies that checkpoint data on the file system has 
additional
    entropy added to its path.
    
    Remove code duplication in SavepointITCase.
---
 .../testutils/EntropyInjectingTestFileSystem.java  |  65 +++++++
 .../org.apache.flink.core.fs.FileSystemFactory     |   3 +-
 .../flink/test/checkpointing/SavepointITCase.java  | 200 ++++++++++++++-------
 3 files changed, 205 insertions(+), 63 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java
new file mode 100644
index 0000000..ef1575b
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.testutils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.EntropyInjectingFileSystem;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import java.net.URI;
+
+/**
+ * A test file system that implements {@link EntropyInjectingFileSystem}.
+ */
+public class EntropyInjectingTestFileSystem extends LocalFileSystem implements 
EntropyInjectingFileSystem {
+
+       public static final String ENTROPY_INJECTION_KEY = "_entropy_";
+
+       public static final String ENTROPY = "_resolved_";
+
+       @Override
+       public String getEntropyInjectionKey() {
+               return ENTROPY_INJECTION_KEY;
+       }
+
+       @Override
+       public String generateEntropy() {
+               return ENTROPY;
+       }
+
+       public static class EntropyInjectingTestFileSystemFactory implements 
FileSystemFactory {
+
+               @Override
+               public String getScheme() {
+                       return "test-entropy";
+               }
+
+               @Override
+               public void configure(final Configuration config) {
+               }
+
+               @Override
+               public FileSystem create(final URI fsUri) {
+                       return new EntropyInjectingTestFileSystem();
+               }
+       }
+}
diff --git 
a/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
 
b/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
index 5a3a31d..76965cb 100644
--- 
a/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
+++ 
b/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.testutils.TestFileSystem$TestFileSystemFactory
\ No newline at end of file
+org.apache.flink.testutils.TestFileSystem$TestFileSystemFactory
+org.apache.flink.testutils.EntropyInjectingTestFileSystem$EntropyInjectingTestFileSystemFactory
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 2cd2bbb..e706708 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -53,11 +53,16 @@ import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -66,7 +71,12 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileVisitOption;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
@@ -76,9 +86,12 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -93,6 +106,22 @@ public class SavepointITCase extends TestLogger {
        @Rule
        public final TemporaryFolder folder = new TemporaryFolder();
 
+       private File checkpointDir;
+
+       private File savepointDir;
+
+       @Before
+       public void setUp() throws Exception {
+               final File testRoot = folder.newFolder();
+
+               checkpointDir = new File(testRoot, "checkpoints");
+               savepointDir = new File(testRoot, "savepoints");
+
+               if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) {
+                       fail("Test setup failed: failed to create temporary 
directories.");
+               }
+       }
+
        /**
         * Triggers a savepoint for a job that uses the FsStateBackend. We 
expect
         * that all checkpoint files are written to a new savepoint directory.
@@ -109,35 +138,46 @@ public class SavepointITCase extends TestLogger {
         */
        @Test
        public void testTriggerSavepointAndResumeWithFileBasedCheckpoints() 
throws Exception {
-               // Config
                final int numTaskManagers = 2;
                final int numSlotsPerTaskManager = 2;
                final int parallelism = numTaskManagers * 
numSlotsPerTaskManager;
-               final File testRoot = folder.newFolder();
 
-               Configuration config = new Configuration();
+               final MiniClusterResourceFactory clusterFactory = new 
MiniClusterResourceFactory(
+                       numTaskManagers,
+                       numSlotsPerTaskManager,
+                       getFileBasedCheckpointsConfig());
 
-               final File checkpointDir = new File(testRoot, "checkpoints");
-               final File savepointRootDir = new File(testRoot, "savepoints");
+               final String savepointPath = 
submitJobAndTakeSavepoint(clusterFactory, parallelism);
+               verifySavepoint(parallelism, savepointPath);
 
-               if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
-                       fail("Test setup failed: failed to create temporary 
directories.");
-               }
+               restoreJobAndVerifyState(savepointPath, clusterFactory, 
parallelism);
+       }
 
-               // Use file based checkpoints
-               config.setString(CheckpointingOptions.STATE_BACKEND, 
"filesystem");
-               config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
-               config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 
0);
-               config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointRootDir.toURI().toString());
+       @Test
+       public void testShouldAddEntropyToSavepointPath() throws Exception {
+               final int numTaskManagers = 2;
+               final int numSlotsPerTaskManager = 2;
+               final int parallelism = numTaskManagers * 
numSlotsPerTaskManager;
 
-               MiniClusterResourceFactory clusterFactory = new 
MiniClusterResourceFactory(numTaskManagers, numSlotsPerTaskManager, config);
+               final MiniClusterResourceFactory clusterFactory = new 
MiniClusterResourceFactory(
+                       numTaskManagers,
+                       numSlotsPerTaskManager,
+                       getCheckpointingWithEntropyConfig());
 
-               String savepointPath = 
submitJobAndGetVerifiedSavepoint(clusterFactory, parallelism);
+               final String savepointPath = 
submitJobAndTakeSavepoint(clusterFactory, parallelism);
+               assertThat(savepointDir, hasEntropyInFileStateHandlePaths());
 
                restoreJobAndVerifyState(savepointPath, clusterFactory, 
parallelism);
        }
 
-       private String 
submitJobAndGetVerifiedSavepoint(MiniClusterResourceFactory clusterFactory, int 
parallelism) throws Exception {
+       private Configuration getCheckpointingWithEntropyConfig() {
+               final String savepointPathWithEntropyPlaceholder = new 
File(savepointDir, 
EntropyInjectingTestFileSystem.ENTROPY_INJECTION_KEY).getPath();
+               final Configuration config = 
getFileBasedCheckpointsConfig("test-entropy://" + 
savepointPathWithEntropyPlaceholder);
+               config.setString("s3.entropy.key", 
EntropyInjectingTestFileSystem.ENTROPY_INJECTION_KEY);
+               return config;
+       }
+
+       private String submitJobAndTakeSavepoint(MiniClusterResourceFactory 
clusterFactory, int parallelism) throws Exception {
                final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
                final JobID jobId = jobGraph.getJobID();
                StatefulCounter.resetForTest(parallelism);
@@ -152,32 +192,32 @@ public class SavepointITCase extends TestLogger {
 
                        StatefulCounter.getProgressLatch().await();
 
-                       String savepointPath = client.triggerSavepoint(jobId, 
null).get();
-
-                       // Only one savepoint should exist
-                       File savepointDir = new File(new URI(savepointPath));
-                       assertTrue("Savepoint directory does not exist.", 
savepointDir.exists());
-                       assertTrue("Savepoint did not create self-contained 
directory.", savepointDir.isDirectory());
-
-                       File[] savepointFiles = savepointDir.listFiles();
-
-                       if (savepointFiles != null) {
-                               // Expect one metadata file and one checkpoint 
file per stateful
-                               // parallel subtask
-                               String errMsg = "Did not write expected number 
of savepoint/checkpoint files to directory: "
-                                       + Arrays.toString(savepointFiles);
-                               assertEquals(errMsg, 1 + parallelism, 
savepointFiles.length);
-                       } else {
-                               fail(String.format("Returned savepoint path 
(%s) is not valid.", savepointPath));
-                       }
-
-                       return savepointPath;
+                       return client.triggerSavepoint(jobId, null).get();
                } finally {
                        cluster.after();
                        StatefulCounter.resetForTest(parallelism);
                }
        }
 
+       private void verifySavepoint(final int parallelism, final String 
savepointPath) throws URISyntaxException {
+               // Only one savepoint should exist
+               File savepointDir = new File(new URI(savepointPath));
+               assertTrue("Savepoint directory does not exist.", 
savepointDir.exists());
+               assertTrue("Savepoint did not create self-contained 
directory.", savepointDir.isDirectory());
+
+               File[] savepointFiles = savepointDir.listFiles();
+
+               if (savepointFiles != null) {
+                       // Expect one metadata file and one checkpoint file per 
stateful
+                       // parallel subtask
+                       String errMsg = "Did not write expected number of 
savepoint/checkpoint files to directory: "
+                               + Arrays.toString(savepointFiles);
+                       assertEquals(errMsg, 1 + parallelism, 
savepointFiles.length);
+               } else {
+                       fail(String.format("Returned savepoint path (%s) is not 
valid.", savepointPath));
+               }
+       }
+
        private void restoreJobAndVerifyState(String savepointPath, 
MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
                final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
                
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
@@ -224,9 +264,6 @@ public class SavepointITCase extends TestLogger {
                final int numTaskManagers = 1;
                final int numSlotsPerTaskManager = 1;
 
-               final File tmpDir = folder.newFolder();
-               final File savepointDir = new File(tmpDir, "savepoints");
-
                final Configuration config = new Configuration();
                config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir.toURI().toString());
 
@@ -299,9 +336,6 @@ public class SavepointITCase extends TestLogger {
                int numSlotsPerTaskManager = 1;
                int parallelism = numTaskManagers * numSlotsPerTaskManager;
 
-               final File tmpDir = folder.newFolder();
-               final File savepointDir = new File(tmpDir, "savepoints");
-
                final Configuration config = new Configuration();
                config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir.toURI().toString());
 
@@ -361,9 +395,6 @@ public class SavepointITCase extends TestLogger {
                // Test deadline
                final Deadline deadline = 
Deadline.now().plus(Duration.ofMinutes(5));
 
-               final File tmpDir = folder.newFolder();
-               final File savepointDir = new File(tmpDir, "savepoints");
-
                // Flink configuration
                final Configuration config = new Configuration();
                config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir.toURI().toString());
@@ -590,11 +621,6 @@ public class SavepointITCase extends TestLogger {
                        iterTestCheckpointVerify[i] = 0;
                }
 
-               TemporaryFolder folder = new TemporaryFolder();
-               folder.create();
-               // Temporary directory for file state backend
-               final File tmpDir = folder.newFolder();
-
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                final IntegerStreamSource source = new IntegerStreamSource();
                IterativeStream<Integer> iteration = env.addSource(source)
@@ -638,20 +664,9 @@ public class SavepointITCase extends TestLogger {
 
                JobGraph jobGraph = streamGraph.getJobGraph();
 
-               Configuration config = new Configuration();
+               Configuration config = getFileBasedCheckpointsConfig();
                config.addAll(jobGraph.getJobConfiguration());
                config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
-               final File checkpointDir = new File(tmpDir, "checkpoints");
-               final File savepointDir = new File(tmpDir, "savepoints");
-
-               if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) {
-                       fail("Test setup failed: failed to create temporary 
directories.");
-               }
-
-               config.setString(CheckpointingOptions.STATE_BACKEND, 
"filesystem");
-               config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
-               config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 
0);
-               config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir.toURI().toString());
 
                MiniClusterWithClientResource cluster = new 
MiniClusterWithClientResource(
                        new MiniClusterResourceConfiguration.Builder()
@@ -798,4 +813,65 @@ public class SavepointITCase extends TestLogger {
                                        .build());
                }
        }
+
+       private Configuration getFileBasedCheckpointsConfig(final String 
savepointDir) {
+               final Configuration config = new Configuration();
+               config.setString(CheckpointingOptions.STATE_BACKEND, 
"filesystem");
+               config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
+               config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 
0);
+               config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
+               return config;
+       }
+
+       private Configuration getFileBasedCheckpointsConfig() {
+               return 
getFileBasedCheckpointsConfig(savepointDir.toURI().toString());
+       }
+
+       private static Matcher<File> hasEntropyInFileStateHandlePaths() {
+               return new TypeSafeDiagnosingMatcher<File>() {
+
+                       @Override
+                       protected boolean matchesSafely(final File 
savepointDir, final Description mismatchDescription) {
+                               if (savepointDir == null) {
+                                       
mismatchDescription.appendText("savepoint dir must not be null");
+                                       return false;
+                               }
+
+                               final List<Path> filesWithoutEntropy = 
listRecursively(savepointDir.toPath().resolve(EntropyInjectingTestFileSystem.ENTROPY_INJECTION_KEY));
+                               final Path savepointDirWithEntropy = 
savepointDir.toPath().resolve(EntropyInjectingTestFileSystem.ENTROPY);
+                               final List<Path> filesWithEntropy = 
listRecursively(savepointDirWithEntropy);
+
+                               if (!filesWithoutEntropy.isEmpty()) {
+                                       mismatchDescription.appendText("there 
are savepoint files with unresolved entropy placeholders");
+                                       return false;
+                               }
+
+                               if (!Files.exists(savepointDirWithEntropy) || 
filesWithEntropy.isEmpty()) {
+                                       mismatchDescription.appendText("there 
are no savepoint files with added entropy");
+                                       return false;
+                               }
+
+                               return true;
+                       }
+
+                       @Override
+                       public void describeTo(final Description description) {
+                               description.appendText("all savepoint files 
should have added entropy");
+                       }
+               };
+       }
+
+       private static List<Path> listRecursively(final Path dir) {
+               try {
+                       if (!Files.exists(dir)) {
+                               return Collections.emptyList();
+                       } else {
+                               try (Stream<Path> files = Files.walk(dir, 
FileVisitOption.FOLLOW_LINKS)) {
+                                       return 
files.filter(Files::isRegularFile).collect(Collectors.toList());
+                               }
+                       }
+               } catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+       }
 }

Reply via email to