This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 98c49d6 [FLINK-24506][config] Adds checkpoint directory to
CheckpointConfig
98c49d6 is described below
commit 98c49d63efa7723ed5394e503f7f0c3f54061456
Author: Matthias Pohl <[email protected]>
AuthorDate: Tue Nov 30 18:02:17 2021 +0100
[FLINK-24506][config] Adds checkpoint directory to CheckpointConfig
This enables the user to pass in the checkpoint directory through the Flink
configuration again. This fixes an issue that was introduced by FLINK-19463.
---
.../api/environment/CheckpointConfig.java | 4 +
.../CheckpointConfigFromConfigurationTest.java | 102 +++++++++++++++++++--
2 files changed, 99 insertions(+), 7 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 6b0275b..3cf2e86 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStorage;
@@ -756,5 +757,8 @@ public class CheckpointConfig implements
java.io.Serializable {
configuration
.getOptional(ExecutionCheckpointingOptions.FORCE_UNALIGNED)
.ifPresent(this::setForceUnalignedCheckpoints);
+ configuration
+ .getOptional(CheckpointingOptions.CHECKPOINTS_DIRECTORY)
+ .ifPresent(this::setCheckpointStorage);
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
index 0bbab99..09ae9af 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
@@ -18,10 +18,18 @@
package org.apache.flink.streaming.api.environment;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.Preconditions;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -31,8 +39,7 @@ import java.util.Collection;
import java.util.function.BiConsumer;
import java.util.function.Function;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
/**
* Tests for configuring {@link CheckpointConfig} via {@link
@@ -42,7 +49,7 @@ import static org.junit.Assert.assertThat;
public class CheckpointConfigFromConfigurationTest {
@Parameterized.Parameters(name = "{0}")
- public static Collection<TestSpec> specs() {
+ public static Collection<TestSpec<?>> specs() {
return Arrays.asList(
TestSpec.testValue(CheckpointingMode.AT_LEAST_ONCE)
.whenSetFromFile("execution.checkpointing.mode",
"AT_LEAST_ONCE")
@@ -96,10 +103,82 @@ public class CheckpointConfigFromConfigurationTest {
.whenSetFromFile("execution.checkpointing.unaligned",
"true")
.viaSetter(CheckpointConfig::enableUnalignedCheckpoints)
.getterVia(CheckpointConfig::isUnalignedCheckpointsEnabled)
- .nonDefaultValue(true));
+ .nonDefaultValue(true),
+ TestSpec.testValue(
+ (CheckpointStorage)
+ new FileSystemCheckpointStorage(
+
"file:///path/to/checkpoint/dir"))
+ .whenSetFromFile(
+
CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
+ "file:///path/to/checkpoint/dir")
+ .viaSetter(CheckpointConfig::setCheckpointStorage)
+ .getterVia(CheckpointConfig::getCheckpointStorage)
+ .nonDefaultValue(
+ new
FileSystemCheckpointStorage("file:///path/to/checkpoint/dir"))
+
.customMatcher(FileSystemCheckpointStorageMatcher::new));
}
- @Parameterized.Parameter public TestSpec spec;
+ /**
+ * {@code FileSystemCheckpointStorageMatcher} verifies that the set {@link
CheckpointStorage} is
+ * of type {@link FileSystemCheckpointStorage} pointing to the same
filesystem path.
+ */
+ private static class FileSystemCheckpointStorageMatcher
+ extends TypeSafeMatcher<CheckpointStorage> {
+ private static final Class<FileSystemCheckpointStorage>
EXPECTED_CHECKPOINT_STORAGE_CLASS =
+ FileSystemCheckpointStorage.class;
+ private final FileSystemCheckpointStorage
fileSystemCheckpointStorageFromSetter;
+
+ public FileSystemCheckpointStorageMatcher(CheckpointStorage
checkpointStorageFromSetter) {
+ Preconditions.checkArgument(
+ checkpointStorageFromSetter
+ .getClass()
+ .equals(EXPECTED_CHECKPOINT_STORAGE_CLASS));
+ this.fileSystemCheckpointStorageFromSetter =
+ (FileSystemCheckpointStorage) checkpointStorageFromSetter;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ describeObject(fileSystemCheckpointStorageFromSetter, description);
+ }
+
+ @Override
+ protected void describeMismatchSafely(
+ CheckpointStorage checkpointStorageFromFile, Description
description) {
+ if
(!checkpointStorageFromFile.getClass().equals(EXPECTED_CHECKPOINT_STORAGE_CLASS))
{
+ description.appendText(
+ "Passed object is not of type "
+ +
EXPECTED_CHECKPOINT_STORAGE_CLASS.getCanonicalName());
+ } else {
+ describeObject(
+ (FileSystemCheckpointStorage)
checkpointStorageFromFile, description);
+ }
+ super.describeMismatchSafely(checkpointStorageFromFile,
description);
+ }
+
+ private static void describeObject(
+ FileSystemCheckpointStorage fileSystemCheckpointStorage,
Description description) {
+ description
+
.appendText(EXPECTED_CHECKPOINT_STORAGE_CLASS.getCanonicalName())
+ .appendText("(")
+
.appendText(fileSystemCheckpointStorage.getCheckpointPath().toString())
+ .appendText(")");
+ }
+
+ @Override
+ protected boolean matchesSafely(CheckpointStorage
checkpointStorageFromFile) {
+ if
(!checkpointStorageFromFile.getClass().equals(EXPECTED_CHECKPOINT_STORAGE_CLASS))
{
+ return false;
+ }
+ final FileSystemCheckpointStorage
fileSystemCheckpointStorageFromFile =
+ (FileSystemCheckpointStorage) checkpointStorageFromFile;
+ return fileSystemCheckpointStorageFromFile
+ .getCheckpointPath()
+
.equals(fileSystemCheckpointStorageFromSetter.getCheckpointPath());
+ }
+ }
+
+ @Parameterized.Parameter public TestSpec<?> spec;
@Test
public void testLoadingFromConfiguration() {
@@ -133,6 +212,8 @@ public class CheckpointConfigFromConfigurationTest {
private BiConsumer<CheckpointConfig, T> setter;
private Function<CheckpointConfig, T> getter;
+ private Function<T, Matcher<T>> createMatcher = CoreMatchers::equalTo;
+
private TestSpec(T value) {
this.objectValue = value;
}
@@ -162,6 +243,11 @@ public class CheckpointConfigFromConfigurationTest {
return this;
}
+ public TestSpec<T> customMatcher(Function<T, Matcher<T>>
customMatcher) {
+ this.createMatcher = customMatcher;
+ return this;
+ }
+
public void setValue(CheckpointConfig config) {
setter.accept(config, objectValue);
}
@@ -172,11 +258,13 @@ public class CheckpointConfigFromConfigurationTest {
public void assertEqual(
CheckpointConfig configFromFile, CheckpointConfig
configFromSetters) {
- assertThat(getter.apply(configFromFile),
equalTo(getter.apply(configFromSetters)));
+ assertThat(
+ getter.apply(configFromFile),
+ createMatcher.apply(getter.apply(configFromSetters)));
}
public void assertEqualNonDefault(CheckpointConfig configFromFile) {
- assertThat(getter.apply(configFromFile), equalTo(nonDefaultValue));
+ assertThat(getter.apply(configFromFile),
createMatcher.apply(nonDefaultValue));
}
@Override