This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new ba42d86  [FLINK-11107] [state] Avoid memory stateBackend to create 
arbitrary folders under HA path when no checkpoint path configured
ba42d86 is described below

commit ba42d86267fa3ebe8e439794149a85823e047942
Author: Yun Tang <[email protected]>
AuthorDate: Wed Dec 12 01:37:31 2018 +0800

    [FLINK-11107] [state] Avoid memory stateBackend to create arbitrary folders 
under HA path when no checkpoint path configured
    
    This closes #7281.
---
 .../flink/runtime/state/StateBackendLoader.java    | 26 -------
 .../runtime/state/StateBackendLoadingTest.java     | 79 +++++++++-------------
 2 files changed, 32 insertions(+), 73 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
index dbb20b8..7ab572e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
@@ -20,10 +20,8 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -35,7 +33,6 @@ import org.slf4j.Logger;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -232,29 +229,6 @@ public class StateBackendLoader {
                        }
                }
 
-               // to keep supporting the old behavior where default 
(JobManager) Backend + HA mode = checkpoints in HA store
-               // we add the HA persistence dir as the checkpoint directory if 
none other is set
-
-               if (backend instanceof MemoryStateBackend) {
-                       final MemoryStateBackend memBackend = 
(MemoryStateBackend) backend;
-
-                       if (memBackend.getCheckpointPath() == null && 
HighAvailabilityMode.isHighAvailabilityModeActivated(config)) {
-                               final String haStoragePath = 
config.getString(HighAvailabilityOptions.HA_STORAGE_PATH);
-
-                               if (haStoragePath != null) {
-                                       try {
-                                               Path checkpointDirPath = new 
Path(haStoragePath, UUID.randomUUID().toString());
-                                               if 
(checkpointDirPath.toUri().getScheme() == null) {
-                                                       checkpointDirPath = 
checkpointDirPath.makeQualified(checkpointDirPath.getFileSystem());
-                                               }
-                                               Configuration tempConfig = new 
Configuration(config);
-                                               
tempConfig.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDirPath.toString());
-                                               return 
memBackend.configure(tempConfig, classLoader);
-                                       } catch (Exception ignored) {}
-                               }
-                       }
-               }
-
                return backend;
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
index 04e2bfe..7db140f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
@@ -341,56 +341,29 @@ public class StateBackendLoadingTest {
        // 
------------------------------------------------------------------------
 
        /**
-        * This tests that in the case of configured high-availability, the 
memory state backend
-        * automatically grabs the HA persistence directory.
+        * This tests the default behaviour in the case of configured 
high-availability.
+        * Specially, if not configured checkpoint directory, the memory state 
backend
+        * would not create arbitrary directory under HA persistence directory.
         */
        @Test
-       public void testHighAvailabilityDefaultFallback() throws Exception {
+       public void testHighAvailabilityDefault() throws Exception {
                final String haPersistenceDir = new 
Path(tmp.newFolder().toURI()).toString();
-               final Path expectedCheckpointPath = new Path(haPersistenceDir);
+               testMemoryBackendHighAvailabilityDefault(haPersistenceDir, 
null);
 
-               final Configuration config1 = new Configuration();
-               config1.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-               config1.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
"myCluster");
-               config1.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
haPersistenceDir);
-
-               final Configuration config2 = new Configuration();
-               config2.setString(backendKey, "jobmanager");
-               config2.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-               config2.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
"myCluster");
-               config2.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
haPersistenceDir);
-
-               final MemoryStateBackend appBackend = new MemoryStateBackend();
-
-               final StateBackend loaded1 = 
StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config1, cl, 
null);
-               final StateBackend loaded2 = 
StateBackendLoader.fromApplicationOrConfigOrDefault(null, config1, cl, null);
-               final StateBackend loaded3 = 
StateBackendLoader.fromApplicationOrConfigOrDefault(null, config2, cl, null);
-
-               assertTrue(loaded1 instanceof MemoryStateBackend);
-               assertTrue(loaded2 instanceof MemoryStateBackend);
-               assertTrue(loaded3 instanceof MemoryStateBackend);
-
-               final MemoryStateBackend memBackend1 = (MemoryStateBackend) 
loaded1;
-               final MemoryStateBackend memBackend2 = (MemoryStateBackend) 
loaded2;
-               final MemoryStateBackend memBackend3 = (MemoryStateBackend) 
loaded3;
-
-               assertNotNull(memBackend1.getCheckpointPath());
-               assertNotNull(memBackend2.getCheckpointPath());
-               assertNotNull(memBackend3.getCheckpointPath());
-               assertNull(memBackend1.getSavepointPath());
-               assertNull(memBackend2.getSavepointPath());
-               assertNull(memBackend3.getSavepointPath());
-
-               assertEquals(expectedCheckpointPath, 
memBackend1.getCheckpointPath().getParent());
-               assertEquals(expectedCheckpointPath, 
memBackend2.getCheckpointPath().getParent());
-               assertEquals(expectedCheckpointPath, 
memBackend3.getCheckpointPath().getParent());
+               final Path checkpointPath = new 
Path(tmp.newFolder().toURI().toString());
+               testMemoryBackendHighAvailabilityDefault(haPersistenceDir, 
checkpointPath);
        }
 
        @Test
-       public void testHighAvailabilityDefaultFallbackLocalPaths() throws 
Exception {
+       public void testHighAvailabilityDefaultLocalPaths() throws Exception {
                final String haPersistenceDir = new 
Path(tmp.newFolder().getAbsolutePath()).toString();
-               final Path expectedCheckpointPath = new 
Path(haPersistenceDir).makeQualified(FileSystem.getLocalFileSystem());
+               testMemoryBackendHighAvailabilityDefault(haPersistenceDir, 
null);
+
+               final Path checkpointPath = new 
Path(tmp.newFolder().toURI().toString()).makeQualified(FileSystem.getLocalFileSystem());
+               testMemoryBackendHighAvailabilityDefault(haPersistenceDir, 
checkpointPath);
+       }
 
+       private void testMemoryBackendHighAvailabilityDefault(String 
haPersistenceDir, Path checkpointPath) throws Exception {
                final Configuration config1 = new Configuration();
                config1.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
                config1.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
"myCluster");
@@ -402,6 +375,11 @@ public class StateBackendLoadingTest {
                config2.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
"myCluster");
                config2.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
haPersistenceDir);
 
+               if (checkpointPath != null) {
+                       
config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointPath.toUri().toString());
+                       
config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointPath.toUri().toString());
+               }
+
                final MemoryStateBackend appBackend = new MemoryStateBackend();
 
                final StateBackend loaded1 = 
StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config1, cl, 
null);
@@ -416,16 +394,23 @@ public class StateBackendLoadingTest {
                final MemoryStateBackend memBackend2 = (MemoryStateBackend) 
loaded2;
                final MemoryStateBackend memBackend3 = (MemoryStateBackend) 
loaded3;
 
-               assertNotNull(memBackend1.getCheckpointPath());
-               assertNotNull(memBackend2.getCheckpointPath());
-               assertNotNull(memBackend3.getCheckpointPath());
                assertNull(memBackend1.getSavepointPath());
                assertNull(memBackend2.getSavepointPath());
                assertNull(memBackend3.getSavepointPath());
 
-               assertEquals(expectedCheckpointPath, 
memBackend1.getCheckpointPath().getParent());
-               assertEquals(expectedCheckpointPath, 
memBackend2.getCheckpointPath().getParent());
-               assertEquals(expectedCheckpointPath, 
memBackend3.getCheckpointPath().getParent());
+               if (checkpointPath != null) {
+                       assertNotNull(memBackend1.getCheckpointPath());
+                       assertNotNull(memBackend2.getCheckpointPath());
+                       assertNotNull(memBackend3.getCheckpointPath());
+
+                       assertEquals(checkpointPath, 
memBackend1.getCheckpointPath());
+                       assertEquals(checkpointPath, 
memBackend2.getCheckpointPath());
+                       assertEquals(checkpointPath, 
memBackend3.getCheckpointPath());
+               } else {
+                       assertNull(memBackend1.getCheckpointPath());
+                       assertNull(memBackend2.getCheckpointPath());
+                       assertNull(memBackend3.getCheckpointPath());
+               }
        }
 
        // 
------------------------------------------------------------------------

Reply via email to