[
https://issues.apache.org/jira/browse/FLINK-10764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687835#comment-16687835
]
ASF GitHub Bot commented on FLINK-10764:
----------------------------------------
GJL closed pull request #7075: [FLINK-10764][tests] Add ITCase for checkpoint
path entropy injection.
URL: https://github.com/apache/flink/pull/7075
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java
b/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java
new file mode 100644
index 00000000000..ef1575bea74
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.testutils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.EntropyInjectingFileSystem;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import java.net.URI;
+
+/**
+ * A test file system that implements {@link EntropyInjectingFileSystem}.
+ */
+public class EntropyInjectingTestFileSystem extends LocalFileSystem implements
EntropyInjectingFileSystem {
+
+ public static final String ENTROPY_INJECTION_KEY = "_entropy_";
+
+ public static final String ENTROPY = "_resolved_";
+
+ @Override
+ public String getEntropyInjectionKey() {
+ return ENTROPY_INJECTION_KEY;
+ }
+
+ @Override
+ public String generateEntropy() {
+ return ENTROPY;
+ }
+
+ public static class EntropyInjectingTestFileSystemFactory implements
FileSystemFactory {
+
+ @Override
+ public String getScheme() {
+ return "test-entropy";
+ }
+
+ @Override
+ public void configure(final Configuration config) {
+ }
+
+ @Override
+ public FileSystem create(final URI fsUri) {
+ return new EntropyInjectingTestFileSystem();
+ }
+ }
+}
diff --git
a/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
b/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
index 5a3a31de235..76965cb2145 100644
---
a/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
+++
b/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.testutils.TestFileSystem$TestFileSystemFactory
\ No newline at end of file
+org.apache.flink.testutils.TestFileSystem$TestFileSystemFactory
+org.apache.flink.testutils.EntropyInjectingTestFileSystem$EntropyInjectingTestFileSystemFactory
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 2cd2bbb60e9..e7067086ce1 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -53,11 +53,16 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -66,7 +71,12 @@
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileVisitOption;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
@@ -76,9 +86,12 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -93,6 +106,22 @@
@Rule
public final TemporaryFolder folder = new TemporaryFolder();
+ private File checkpointDir;
+
+ private File savepointDir;
+
+ @Before
+ public void setUp() throws Exception {
+ final File testRoot = folder.newFolder();
+
+ checkpointDir = new File(testRoot, "checkpoints");
+ savepointDir = new File(testRoot, "savepoints");
+
+ if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) {
+ fail("Test setup failed: failed to create temporary
directories.");
+ }
+ }
+
/**
* Triggers a savepoint for a job that uses the FsStateBackend. We
expect
* that all checkpoint files are written to a new savepoint directory.
@@ -109,35 +138,46 @@
*/
@Test
public void testTriggerSavepointAndResumeWithFileBasedCheckpoints()
throws Exception {
- // Config
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;
final int parallelism = numTaskManagers *
numSlotsPerTaskManager;
- final File testRoot = folder.newFolder();
- Configuration config = new Configuration();
+ final MiniClusterResourceFactory clusterFactory = new
MiniClusterResourceFactory(
+ numTaskManagers,
+ numSlotsPerTaskManager,
+ getFileBasedCheckpointsConfig());
- final File checkpointDir = new File(testRoot, "checkpoints");
- final File savepointRootDir = new File(testRoot, "savepoints");
+ final String savepointPath =
submitJobAndTakeSavepoint(clusterFactory, parallelism);
+ verifySavepoint(parallelism, savepointPath);
- if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
- fail("Test setup failed: failed to create temporary
directories.");
- }
+ restoreJobAndVerifyState(savepointPath, clusterFactory,
parallelism);
+ }
- // Use file based checkpoints
- config.setString(CheckpointingOptions.STATE_BACKEND,
"filesystem");
- config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir.toURI().toString());
- config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
0);
- config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointRootDir.toURI().toString());
+ @Test
+ public void testShouldAddEntropyToSavepointPath() throws Exception {
+ final int numTaskManagers = 2;
+ final int numSlotsPerTaskManager = 2;
+ final int parallelism = numTaskManagers *
numSlotsPerTaskManager;
- MiniClusterResourceFactory clusterFactory = new
MiniClusterResourceFactory(numTaskManagers, numSlotsPerTaskManager, config);
+ final MiniClusterResourceFactory clusterFactory = new
MiniClusterResourceFactory(
+ numTaskManagers,
+ numSlotsPerTaskManager,
+ getCheckpointingWithEntropyConfig());
- String savepointPath =
submitJobAndGetVerifiedSavepoint(clusterFactory, parallelism);
+ final String savepointPath =
submitJobAndTakeSavepoint(clusterFactory, parallelism);
+ assertThat(savepointDir, hasEntropyInFileStateHandlePaths());
restoreJobAndVerifyState(savepointPath, clusterFactory,
parallelism);
}
- private String
submitJobAndGetVerifiedSavepoint(MiniClusterResourceFactory clusterFactory, int
parallelism) throws Exception {
+ private Configuration getCheckpointingWithEntropyConfig() {
+ final String savepointPathWithEntropyPlaceholder = new
File(savepointDir,
EntropyInjectingTestFileSystem.ENTROPY_INJECTION_KEY).getPath();
+ final Configuration config =
getFileBasedCheckpointsConfig("test-entropy://" +
savepointPathWithEntropyPlaceholder);
+ config.setString("s3.entropy.key",
EntropyInjectingTestFileSystem.ENTROPY_INJECTION_KEY);
+ return config;
+ }
+
+ private String submitJobAndTakeSavepoint(MiniClusterResourceFactory
clusterFactory, int parallelism) throws Exception {
final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
final JobID jobId = jobGraph.getJobID();
StatefulCounter.resetForTest(parallelism);
@@ -152,32 +192,32 @@ private String
submitJobAndGetVerifiedSavepoint(MiniClusterResourceFactory clust
StatefulCounter.getProgressLatch().await();
- String savepointPath = client.triggerSavepoint(jobId,
null).get();
-
- // Only one savepoint should exist
- File savepointDir = new File(new URI(savepointPath));
- assertTrue("Savepoint directory does not exist.",
savepointDir.exists());
- assertTrue("Savepoint did not create self-contained
directory.", savepointDir.isDirectory());
-
- File[] savepointFiles = savepointDir.listFiles();
-
- if (savepointFiles != null) {
- // Expect one metadata file and one checkpoint
file per stateful
- // parallel subtask
- String errMsg = "Did not write expected number
of savepoint/checkpoint files to directory: "
- + Arrays.toString(savepointFiles);
- assertEquals(errMsg, 1 + parallelism,
savepointFiles.length);
- } else {
- fail(String.format("Returned savepoint path
(%s) is not valid.", savepointPath));
- }
-
- return savepointPath;
+ return client.triggerSavepoint(jobId, null).get();
} finally {
cluster.after();
StatefulCounter.resetForTest(parallelism);
}
}
+ private void verifySavepoint(final int parallelism, final String
savepointPath) throws URISyntaxException {
+ // Only one savepoint should exist
+ File savepointDir = new File(new URI(savepointPath));
+ assertTrue("Savepoint directory does not exist.",
savepointDir.exists());
+ assertTrue("Savepoint did not create self-contained
directory.", savepointDir.isDirectory());
+
+ File[] savepointFiles = savepointDir.listFiles();
+
+ if (savepointFiles != null) {
+ // Expect one metadata file and one checkpoint file per
stateful
+ // parallel subtask
+ String errMsg = "Did not write expected number of
savepoint/checkpoint files to directory: "
+ + Arrays.toString(savepointFiles);
+ assertEquals(errMsg, 1 + parallelism,
savepointFiles.length);
+ } else {
+ fail(String.format("Returned savepoint path (%s) is not
valid.", savepointPath));
+ }
+ }
+
private void restoreJobAndVerifyState(String savepointPath,
MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
@@ -224,9 +264,6 @@ public void testTriggerSavepointForNonExistingJob() throws
Exception {
final int numTaskManagers = 1;
final int numSlotsPerTaskManager = 1;
- final File tmpDir = folder.newFolder();
- final File savepointDir = new File(tmpDir, "savepoints");
-
final Configuration config = new Configuration();
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointDir.toURI().toString());
@@ -299,9 +336,6 @@ public void testSubmitWithUnknownSavepointPath() throws
Exception {
int numSlotsPerTaskManager = 1;
int parallelism = numTaskManagers * numSlotsPerTaskManager;
- final File tmpDir = folder.newFolder();
- final File savepointDir = new File(tmpDir, "savepoints");
-
final Configuration config = new Configuration();
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointDir.toURI().toString());
@@ -361,9 +395,6 @@ public void testCanRestoreWithModifiedStatelessOperators()
throws Exception {
// Test deadline
final Deadline deadline =
Deadline.now().plus(Duration.ofMinutes(5));
- final File tmpDir = folder.newFolder();
- final File savepointDir = new File(tmpDir, "savepoints");
-
// Flink configuration
final Configuration config = new Configuration();
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointDir.toURI().toString());
@@ -590,11 +621,6 @@ public void testSavepointForJobWithIteration() throws
Exception {
iterTestCheckpointVerify[i] = 0;
}
- TemporaryFolder folder = new TemporaryFolder();
- folder.create();
- // Temporary directory for file state backend
- final File tmpDir = folder.newFolder();
-
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
final IntegerStreamSource source = new IntegerStreamSource();
IterativeStream<Integer> iteration = env.addSource(source)
@@ -638,20 +664,9 @@ public Integer map(Integer value) throws Exception {
JobGraph jobGraph = streamGraph.getJobGraph();
- Configuration config = new Configuration();
+ Configuration config = getFileBasedCheckpointsConfig();
config.addAll(jobGraph.getJobConfiguration());
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
- final File checkpointDir = new File(tmpDir, "checkpoints");
- final File savepointDir = new File(tmpDir, "savepoints");
-
- if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) {
- fail("Test setup failed: failed to create temporary
directories.");
- }
-
- config.setString(CheckpointingOptions.STATE_BACKEND,
"filesystem");
- config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir.toURI().toString());
- config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
0);
- config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointDir.toURI().toString());
MiniClusterWithClientResource cluster = new
MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
@@ -798,4 +813,65 @@ MiniClusterWithClientResource get() {
.build());
}
}
+
+ private Configuration getFileBasedCheckpointsConfig(final String
savepointDir) {
+ final Configuration config = new Configuration();
+ config.setString(CheckpointingOptions.STATE_BACKEND,
"filesystem");
+ config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir.toURI().toString());
+ config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
0);
+ config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointDir);
+ return config;
+ }
+
+ private Configuration getFileBasedCheckpointsConfig() {
+ return
getFileBasedCheckpointsConfig(savepointDir.toURI().toString());
+ }
+
+ private static Matcher<File> hasEntropyInFileStateHandlePaths() {
+ return new TypeSafeDiagnosingMatcher<File>() {
+
+ @Override
+ protected boolean matchesSafely(final File
savepointDir, final Description mismatchDescription) {
+ if (savepointDir == null) {
+
mismatchDescription.appendText("savepoint dir must not be null");
+ return false;
+ }
+
+ final List<Path> filesWithoutEntropy =
listRecursively(savepointDir.toPath().resolve(EntropyInjectingTestFileSystem.ENTROPY_INJECTION_KEY));
+ final Path savepointDirWithEntropy =
savepointDir.toPath().resolve(EntropyInjectingTestFileSystem.ENTROPY);
+ final List<Path> filesWithEntropy =
listRecursively(savepointDirWithEntropy);
+
+ if (!filesWithoutEntropy.isEmpty()) {
+ mismatchDescription.appendText("there
are savepoint files with unresolved entropy placeholders");
+ return false;
+ }
+
+ if (!Files.exists(savepointDirWithEntropy) ||
filesWithEntropy.isEmpty()) {
+ mismatchDescription.appendText("there
are no savepoint files with added entropy");
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void describeTo(final Description description) {
+ description.appendText("all savepoint files
should have added entropy");
+ }
+ };
+ }
+
+ private static List<Path> listRecursively(final Path dir) {
+ try {
+ if (!Files.exists(dir)) {
+ return Collections.emptyList();
+ } else {
+ try (Stream<Path> files = Files.walk(dir,
FileVisitOption.FOLLOW_LINKS)) {
+ return
files.filter(Files::isRegularFile).collect(Collectors.toList());
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add S3 entropy injection end-to-end/IT test
> -------------------------------------------
>
> Key: FLINK-10764
> URL: https://issues.apache.org/jira/browse/FLINK-10764
> Project: Flink
> Issue Type: Sub-task
> Components: E2E Tests, FileSystem
> Affects Versions: 1.6.2, 1.7.0
> Reporter: Till Rohrmann
> Assignee: Gary Yao
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> It would be good to add an IT/end-to-end test which verifies the entropy
> injection for the S3 filesystems introduce by FLINK-9061.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)