This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push:
new ba42d86 [FLINK-11107] [state] Avoid memory stateBackend to create
arbitrary folders under HA path when no checkpoint path configured
ba42d86 is described below
commit ba42d86267fa3ebe8e439794149a85823e047942
Author: Yun Tang <[email protected]>
AuthorDate: Wed Dec 12 01:37:31 2018 +0800
[FLINK-11107] [state] Avoid memory stateBackend to create arbitrary folders
under HA path when no checkpoint path configured
This closes #7281.
---
.../flink/runtime/state/StateBackendLoader.java | 26 -------
.../runtime/state/StateBackendLoadingTest.java | 79 +++++++++-------------
2 files changed, 32 insertions(+), 73 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
index dbb20b8..7ab572e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
@@ -20,10 +20,8 @@ package org.apache.flink.runtime.state;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -35,7 +33,6 @@ import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.UUID;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -232,29 +229,6 @@ public class StateBackendLoader {
}
}
- // to keep supporting the old behavior where default
(JobManager) Backend + HA mode = checkpoints in HA store
- // we add the HA persistence dir as the checkpoint directory if
none other is set
-
- if (backend instanceof MemoryStateBackend) {
- final MemoryStateBackend memBackend =
(MemoryStateBackend) backend;
-
- if (memBackend.getCheckpointPath() == null &&
HighAvailabilityMode.isHighAvailabilityModeActivated(config)) {
- final String haStoragePath =
config.getString(HighAvailabilityOptions.HA_STORAGE_PATH);
-
- if (haStoragePath != null) {
- try {
- Path checkpointDirPath = new
Path(haStoragePath, UUID.randomUUID().toString());
- if
(checkpointDirPath.toUri().getScheme() == null) {
- checkpointDirPath =
checkpointDirPath.makeQualified(checkpointDirPath.getFileSystem());
- }
- Configuration tempConfig = new
Configuration(config);
-
tempConfig.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDirPath.toString());
- return
memBackend.configure(tempConfig, classLoader);
- } catch (Exception ignored) {}
- }
- }
- }
-
return backend;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
index 04e2bfe..7db140f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
@@ -341,56 +341,29 @@ public class StateBackendLoadingTest {
//
------------------------------------------------------------------------
/**
- * This tests that in the case of configured high-availability, the
memory state backend
- * automatically grabs the HA persistence directory.
+ * This tests the default behaviour in the case of configured
high-availability.
+ * Specially, if not configured checkpoint directory, the memory state
backend
+ * would not create arbitrary directory under HA persistence directory.
*/
@Test
- public void testHighAvailabilityDefaultFallback() throws Exception {
+ public void testHighAvailabilityDefault() throws Exception {
final String haPersistenceDir = new
Path(tmp.newFolder().toURI()).toString();
- final Path expectedCheckpointPath = new Path(haPersistenceDir);
+ testMemoryBackendHighAvailabilityDefault(haPersistenceDir,
null);
- final Configuration config1 = new Configuration();
- config1.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
- config1.setString(HighAvailabilityOptions.HA_CLUSTER_ID,
"myCluster");
- config1.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
haPersistenceDir);
-
- final Configuration config2 = new Configuration();
- config2.setString(backendKey, "jobmanager");
- config2.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
- config2.setString(HighAvailabilityOptions.HA_CLUSTER_ID,
"myCluster");
- config2.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
haPersistenceDir);
-
- final MemoryStateBackend appBackend = new MemoryStateBackend();
-
- final StateBackend loaded1 =
StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config1, cl,
null);
- final StateBackend loaded2 =
StateBackendLoader.fromApplicationOrConfigOrDefault(null, config1, cl, null);
- final StateBackend loaded3 =
StateBackendLoader.fromApplicationOrConfigOrDefault(null, config2, cl, null);
-
- assertTrue(loaded1 instanceof MemoryStateBackend);
- assertTrue(loaded2 instanceof MemoryStateBackend);
- assertTrue(loaded3 instanceof MemoryStateBackend);
-
- final MemoryStateBackend memBackend1 = (MemoryStateBackend)
loaded1;
- final MemoryStateBackend memBackend2 = (MemoryStateBackend)
loaded2;
- final MemoryStateBackend memBackend3 = (MemoryStateBackend)
loaded3;
-
- assertNotNull(memBackend1.getCheckpointPath());
- assertNotNull(memBackend2.getCheckpointPath());
- assertNotNull(memBackend3.getCheckpointPath());
- assertNull(memBackend1.getSavepointPath());
- assertNull(memBackend2.getSavepointPath());
- assertNull(memBackend3.getSavepointPath());
-
- assertEquals(expectedCheckpointPath,
memBackend1.getCheckpointPath().getParent());
- assertEquals(expectedCheckpointPath,
memBackend2.getCheckpointPath().getParent());
- assertEquals(expectedCheckpointPath,
memBackend3.getCheckpointPath().getParent());
+ final Path checkpointPath = new
Path(tmp.newFolder().toURI().toString());
+ testMemoryBackendHighAvailabilityDefault(haPersistenceDir,
checkpointPath);
}
@Test
- public void testHighAvailabilityDefaultFallbackLocalPaths() throws
Exception {
+ public void testHighAvailabilityDefaultLocalPaths() throws Exception {
final String haPersistenceDir = new
Path(tmp.newFolder().getAbsolutePath()).toString();
- final Path expectedCheckpointPath = new
Path(haPersistenceDir).makeQualified(FileSystem.getLocalFileSystem());
+ testMemoryBackendHighAvailabilityDefault(haPersistenceDir,
null);
+
+ final Path checkpointPath = new
Path(tmp.newFolder().toURI().toString()).makeQualified(FileSystem.getLocalFileSystem());
+ testMemoryBackendHighAvailabilityDefault(haPersistenceDir,
checkpointPath);
+ }
+ private void testMemoryBackendHighAvailabilityDefault(String
haPersistenceDir, Path checkpointPath) throws Exception {
final Configuration config1 = new Configuration();
config1.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config1.setString(HighAvailabilityOptions.HA_CLUSTER_ID,
"myCluster");
@@ -402,6 +375,11 @@ public class StateBackendLoadingTest {
config2.setString(HighAvailabilityOptions.HA_CLUSTER_ID,
"myCluster");
config2.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
haPersistenceDir);
+ if (checkpointPath != null) {
+
config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointPath.toUri().toString());
+
config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointPath.toUri().toString());
+ }
+
final MemoryStateBackend appBackend = new MemoryStateBackend();
final StateBackend loaded1 =
StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config1, cl,
null);
@@ -416,16 +394,23 @@ public class StateBackendLoadingTest {
final MemoryStateBackend memBackend2 = (MemoryStateBackend)
loaded2;
final MemoryStateBackend memBackend3 = (MemoryStateBackend)
loaded3;
- assertNotNull(memBackend1.getCheckpointPath());
- assertNotNull(memBackend2.getCheckpointPath());
- assertNotNull(memBackend3.getCheckpointPath());
assertNull(memBackend1.getSavepointPath());
assertNull(memBackend2.getSavepointPath());
assertNull(memBackend3.getSavepointPath());
- assertEquals(expectedCheckpointPath,
memBackend1.getCheckpointPath().getParent());
- assertEquals(expectedCheckpointPath,
memBackend2.getCheckpointPath().getParent());
- assertEquals(expectedCheckpointPath,
memBackend3.getCheckpointPath().getParent());
+ if (checkpointPath != null) {
+ assertNotNull(memBackend1.getCheckpointPath());
+ assertNotNull(memBackend2.getCheckpointPath());
+ assertNotNull(memBackend3.getCheckpointPath());
+
+ assertEquals(checkpointPath,
memBackend1.getCheckpointPath());
+ assertEquals(checkpointPath,
memBackend2.getCheckpointPath());
+ assertEquals(checkpointPath,
memBackend3.getCheckpointPath());
+ } else {
+ assertNull(memBackend1.getCheckpointPath());
+ assertNull(memBackend2.getCheckpointPath());
+ assertNull(memBackend3.getCheckpointPath());
+ }
}
//
------------------------------------------------------------------------