[FLINK-9355][checkpointing] Simplify configuration of local recovery to a 
simple on/off switch

This closes #6006.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f422598
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f422598
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f422598

Branch: refs/heads/master
Commit: 7f4225987a690e85284bb356dd5e63a996f136d0
Parents: 1451806
Author: Stefan Richter <[email protected]>
Authored: Mon May 14 11:14:47 2018 +0200
Committer: Stefan Richter <[email protected]>
Committed: Mon May 14 17:50:52 2018 +0200

----------------------------------------------------------------------
 .../generated/checkpointing_configuration.html  |  2 +-
 .../configuration/CheckpointingOptions.java     |  6 +--
 .../runtime/state/LocalRecoveryConfig.java      | 52 +++-----------------
 .../TaskExecutorLocalStateStoresManager.java    | 12 ++---
 .../state/heap/HeapKeyedStateBackend.java       |  3 +-
 .../taskexecutor/TaskManagerServices.java       |  8 +--
 .../TaskManagerServicesConfiguration.java       | 17 ++++---
 ...TaskExecutorLocalStateStoresManagerTest.java | 16 +++---
 .../state/TaskLocalStateStoreImplTest.java      |  3 +-
 .../runtime/state/TaskStateManagerImplTest.java |  6 +--
 .../runtime/state/TestLocalRecoveryConfig.java  |  2 +-
 .../NetworkBufferCalculationTest.java           |  3 +-
 .../taskexecutor/TaskExecutorITCase.java        |  3 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 27 +++++-----
 ...askManagerComponentsStartupShutdownTest.java |  3 +-
 .../state/RocksDBKeyedStateBackend.java         | 17 ++-----
 .../StreamOperatorSnapshotRestoreTest.java      |  4 +-
 .../runtime/tasks/LocalStateForwardingTest.java |  4 +-
 .../AbstractLocalRecoveryITCase.java            | 11 ++---
 .../checkpointing/LocalRecoveryHeapITCase.java  |  5 +-
 .../LocalRecoveryRocksDBFullITCase.java         |  5 +-
 .../LocalRecoveryRocksDBIncrementalITCase.java  |  5 +-
 .../ResumeCheckpointManuallyITCase.java         |  8 +--
 23 files changed, 72 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/docs/_includes/generated/checkpointing_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/checkpointing_configuration.html 
b/docs/_includes/generated/checkpointing_configuration.html
index 8b4233f..2894c16 100644
--- a/docs/_includes/generated/checkpointing_configuration.html
+++ b/docs/_includes/generated/checkpointing_configuration.html
@@ -29,7 +29,7 @@
         </tr>
         <tr>
             <td><h5>state.backend.local-recovery</h5></td>
-            <td style="word-wrap: break-word;">"DISABLED"</td>
+            <td style="word-wrap: break-word;">false</td>
             <td></td>
         </tr>
         <tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index c6af7dd..596e8dd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -67,11 +67,11 @@ public class CheckpointingOptions {
                                " this option.");
 
        /**
-        * This option configures local recovery for this state backend.
+        * This option configures local recovery for this state backend. By 
default, local recovery is deactivated.
         */
-       public static final ConfigOption<String> LOCAL_RECOVERY = ConfigOptions
+       public static final ConfigOption<Boolean> LOCAL_RECOVERY = ConfigOptions
                .key("state.backend.local-recovery")
-               .defaultValue("DISABLED");
+               .defaultValue(false);
 
        /**
         * The config parameter defining the root directories for storing 
file-based state for local recovery.

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
index c97fa0b..fc15f5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
-
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nonnull;
 
 /**
@@ -31,57 +26,22 @@ import javax.annotation.Nonnull;
  */
 public class LocalRecoveryConfig {
 
-       /**
-        * Enum over modes of local recovery:
-        * <p><ul>
-        * <li>DISABLED: disables local recovery.
-        * <li>ENABLE_FILE_BASED: enables local recovery in a variant that is 
based on local files.
-        * </ul>
-        */
-       public enum LocalRecoveryMode {
-               DISABLED,
-               ENABLE_FILE_BASED;
-
-               /**
-                * Extracts the {@link LocalRecoveryMode} from the given 
configuration. Defaults to LocalRecoveryMode.DISABLED
-                * if no configuration value is specified or parsing the value 
resulted in an exception.
-                *
-                * @param configuration the configuration that specifies the 
value for the local recovery mode.
-                * @return the local recovery mode as found in the config, or 
LocalRecoveryMode.DISABLED if no mode was
-                * configured or the specified mode could not be parsed.
-                */
-               @Nonnull
-               public static LocalRecoveryMode fromConfig(@Nonnull 
Configuration configuration) {
-                       String localRecoveryConfString = 
configuration.getString(CheckpointingOptions.LOCAL_RECOVERY);
-                       try {
-                               return 
LocalRecoveryConfig.LocalRecoveryMode.valueOf(localRecoveryConfString);
-                       } catch (IllegalArgumentException ex) {
-                               
LoggerFactory.getLogger(LocalRecoveryConfig.class).warn(
-                                       "Exception while parsing configuration 
of local recovery mode. Local recovery will be disabled.",
-                                       ex);
-                               return 
LocalRecoveryConfig.LocalRecoveryMode.DISABLED;
-                       }
-               }
-       }
-
        /** The local recovery mode. */
-       @Nonnull
-       private final LocalRecoveryMode localRecoveryMode;
+       private final boolean localRecoveryEnabled;
 
        /** Encapsulates the root directories and the subtask-specific path. */
        @Nonnull
        private final LocalRecoveryDirectoryProvider localStateDirectories;
 
        public LocalRecoveryConfig(
-               @Nonnull LocalRecoveryMode localRecoveryMode,
+               boolean localRecoveryEnabled,
                @Nonnull LocalRecoveryDirectoryProvider directoryProvider) {
-               this.localRecoveryMode = localRecoveryMode;
+               this.localRecoveryEnabled = localRecoveryEnabled;
                this.localStateDirectories = directoryProvider;
        }
 
-       @Nonnull
-       public LocalRecoveryMode getLocalRecoveryMode() {
-               return localRecoveryMode;
+       public boolean isLocalRecoveryEnabled() {
+               return localRecoveryEnabled;
        }
 
        @Nonnull
@@ -92,7 +52,7 @@ public class LocalRecoveryConfig {
        @Override
        public String toString() {
                return "LocalRecoveryConfig{" +
-                       "localRecoveryMode=" + localRecoveryMode +
+                       "localRecoveryMode=" + localRecoveryEnabled +
                        ", localStateDirectories=" + localStateDirectories +
                        '}';
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index 518ad81..4919f80 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -54,7 +54,7 @@ public class TaskExecutorLocalStateStoresManager {
        private final Map<AllocationID, Map<JobVertexSubtaskKey, 
TaskLocalStateStoreImpl>> taskStateStoresByAllocationID;
 
        /** The configured mode for local recovery on this task manager. */
-       private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode;
+       private final boolean localRecoveryEnabled;
 
        /** This is the root directory for all local state of this task manager 
/ executor. */
        private final File[] localStateRootDirectories;
@@ -71,12 +71,12 @@ public class TaskExecutorLocalStateStoresManager {
        private boolean closed;
 
        public TaskExecutorLocalStateStoresManager(
-               @Nonnull LocalRecoveryConfig.LocalRecoveryMode 
localRecoveryMode,
+               boolean localRecoveryEnabled,
                @Nonnull File[] localStateRootDirectories,
                @Nonnull Executor discardExecutor) throws IOException {
 
                this.taskStateStoresByAllocationID = new HashMap<>();
-               this.localRecoveryMode = localRecoveryMode;
+               this.localRecoveryEnabled = localRecoveryEnabled;
                this.localStateRootDirectories = localStateRootDirectories;
                this.discardExecutor = discardExecutor;
                this.lock = new Object();
@@ -140,7 +140,7 @@ public class TaskExecutorLocalStateStoresManager {
                                        subtaskIndex);
 
                                LocalRecoveryConfig localRecoveryConfig =
-                                       new 
LocalRecoveryConfig(localRecoveryMode, directoryProvider);
+                                       new 
LocalRecoveryConfig(localRecoveryEnabled, directoryProvider);
 
                                taskLocalStateStore = new 
TaskLocalStateStoreImpl(
                                        jobId,
@@ -217,8 +217,8 @@ public class TaskExecutorLocalStateStoresManager {
        }
 
        @VisibleForTesting
-       public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() {
-               return localRecoveryMode;
+       boolean isLocalRecoveryEnabled() {
+               return localRecoveryEnabled;
        }
 
        @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index a02edb0..ab91ee1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -622,8 +622,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        final 
SupplierWithException<CheckpointStreamWithResultProvider, Exception> 
checkpointStreamSupplier =
 
-                               
LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.equals(
-                                       
localRecoveryConfig.getLocalRecoveryMode()) ?
+                               localRecoveryConfig.isLocalRecoveryEnabled() ?
 
                                        () -> 
CheckpointStreamWithResultProvider.createDuplicatingStream(
                                                checkpointId,

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ad5e22d..ad19a57 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.query.QueryableStateUtils;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -255,7 +254,6 @@ public class TaskManagerServices {
 
                final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
 
-               LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode = 
taskManagerServicesConfiguration.getLocalRecoveryMode();
 
                final String[] stateRootDirectoryStrings = 
taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
 
@@ -265,8 +263,10 @@ public class TaskManagerServices {
                        stateRootDirectoryFiles[i] = new 
File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
                }
 
-               final TaskExecutorLocalStateStoresManager taskStateManager =
-                       new 
TaskExecutorLocalStateStoresManager(localRecoveryMode, stateRootDirectoryFiles, 
taskIOExecutor);
+               final TaskExecutorLocalStateStoresManager taskStateManager = 
new TaskExecutorLocalStateStoresManager(
+                       
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
+                       stateRootDirectoryFiles,
+                       taskIOExecutor);
 
                return new TaskManagerServices(
                        taskManagerLocation,

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index b80320c..bf1494e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
@@ -29,7 +30,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.NetUtils;
@@ -78,13 +78,13 @@ public class TaskManagerServicesConfiguration {
 
        private final long timerServiceShutdownTimeout;
 
-       private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode;
+       private final boolean localRecoveryEnabled;
 
        public TaskManagerServicesConfiguration(
                        InetAddress taskManagerAddress,
                        String[] tmpDirPaths,
                        String[] localRecoveryStateRootDirectories,
-                       LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode,
+                       boolean localRecoveryEnabled,
                        NetworkEnvironmentConfiguration networkConfig,
                        QueryableStateConfiguration queryableStateConfig,
                        int numberOfSlots,
@@ -97,7 +97,7 @@ public class TaskManagerServicesConfiguration {
                this.taskManagerAddress = checkNotNull(taskManagerAddress);
                this.tmpDirPaths = checkNotNull(tmpDirPaths);
                this.localRecoveryStateRootDirectories = 
checkNotNull(localRecoveryStateRootDirectories);
-               this.localRecoveryMode = checkNotNull(localRecoveryMode);
+               this.localRecoveryEnabled = checkNotNull(localRecoveryEnabled);
                this.networkConfig = checkNotNull(networkConfig);
                this.queryableStateConfig = checkNotNull(queryableStateConfig);
                this.numberOfSlots = checkNotNull(numberOfSlots);
@@ -128,8 +128,8 @@ public class TaskManagerServicesConfiguration {
                return localRecoveryStateRootDirectories;
        }
 
-       public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() {
-               return localRecoveryMode;
+       public boolean isLocalRecoveryEnabled() {
+               return localRecoveryEnabled;
        }
 
        public NetworkEnvironmentConfiguration getNetworkConfig() {
@@ -209,8 +209,9 @@ public class TaskManagerServicesConfiguration {
                        localStateRootDir = tmpDirs;
                }
 
-               LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode =
-                       
LocalRecoveryConfig.LocalRecoveryMode.fromConfig(configuration);
+               boolean localRecoveryMode = configuration.getBoolean(
+                       CheckpointingOptions.LOCAL_RECOVERY.key(),
+                       CheckpointingOptions.LOCAL_RECOVERY.defaultValue());
 
                final NetworkEnvironmentConfiguration networkConfig = 
parseNetworkEnvironmentConfiguration(
                        configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 2e9b107..97539bd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -62,7 +62,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends 
TestLogger {
                
config.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS,
 rootDirString);
 
                // test configuration of the local state mode
-               config.setString(CheckpointingOptions.LOCAL_RECOVERY, 
"ENABLE_FILE_BASED");
+               config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
 
                final ResourceID tmResourceID = ResourceID.generate();
 
@@ -88,9 +88,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends 
TestLogger {
                }
 
                // verify local recovery mode
-               Assert.assertEquals(
-                       LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED,
-                       taskStateManager.getLocalRecoveryMode());
+               Assert.assertTrue(taskStateManager.isLocalRecoveryEnabled());
 
                Assert.assertEquals("localState", 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
                for (File rootDirectory : rootDirectories) {
@@ -130,9 +128,7 @@ public class TaskExecutorLocalStateStoresManagerTest 
extends TestLogger {
                                localStateRootDirectories[i]);
                }
 
-               Assert.assertEquals(
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
-                       taskStateManager.getLocalRecoveryMode());
+               Assert.assertFalse(taskStateManager.isLocalRecoveryEnabled());
        }
 
        /**
@@ -150,7 +146,7 @@ public class TaskExecutorLocalStateStoresManagerTest 
extends TestLogger {
 
                File[] rootDirs = {temporaryFolder.newFolder(), 
temporaryFolder.newFolder(), temporaryFolder.newFolder()};
                TaskExecutorLocalStateStoresManager storesManager = new 
TaskExecutorLocalStateStoresManager(
-                       LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED,
+                       true,
                        rootDirs,
                        Executors.directExecutor());
 
@@ -187,8 +183,8 @@ public class TaskExecutorLocalStateStoresManagerTest 
extends TestLogger {
 
                // test that local recovery mode is forwarded to the created 
store
                Assert.assertEquals(
-                       storesManager.getLocalRecoveryMode(),
-                       
taskLocalStateStore.getLocalRecoveryConfig().getLocalRecoveryMode());
+                       storesManager.isLocalRecoveryEnabled(),
+                       
taskLocalStateStore.getLocalRecoveryConfig().isLocalRecoveryEnabled());
 
                Assert.assertTrue(testFile.exists());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
index 618320e..7531783 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
@@ -64,8 +64,7 @@ public class TaskLocalStateStoreImplTest {
                LocalRecoveryDirectoryProviderImpl directoryProvider =
                        new 
LocalRecoveryDirectoryProviderImpl(allocationBaseDirs, jobID, jobVertexID, 
subtaskIdx);
 
-               LocalRecoveryConfig localRecoveryConfig =
-                       new 
LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, 
directoryProvider);
+               LocalRecoveryConfig localRecoveryConfig = new 
LocalRecoveryConfig(false, directoryProvider);
 
                this.taskLocalStateStore = new TaskLocalStateStoreImpl(
                        jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
index f58f3f4..71038c1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
@@ -193,7 +193,7 @@ public class TaskStateManagerImplTest extends TestLogger {
                                new 
LocalRecoveryDirectoryProviderImpl(allocBaseDirs, jobID, jobVertexID, 0);
 
                        LocalRecoveryConfig localRecoveryConfig =
-                               new 
LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, 
directoryProvider);
+                               new LocalRecoveryConfig(true, 
directoryProvider);
 
                        TaskLocalStateStore taskLocalStateStore =
                                new TaskLocalStateStoreImpl(jobID, 
allocationID, jobVertexID, 13, localRecoveryConfig, directExecutor);
@@ -220,8 +220,8 @@ public class TaskStateManagerImplTest extends TestLogger {
                        }
 
                        Assert.assertEquals(
-                               
localRecoveryConfFromTaskLocalStateStore.getLocalRecoveryMode(),
-                               
localRecoveryConfFromTaskStateManager.getLocalRecoveryMode());
+                               
localRecoveryConfFromTaskLocalStateStore.isLocalRecoveryEnabled(),
+                               
localRecoveryConfFromTaskStateManager.isLocalRecoveryEnabled());
                } finally {
                        tmpFolder.delete();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
index 7801720..58affc5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
@@ -28,7 +28,7 @@ public class TestLocalRecoveryConfig {
        private static final LocalRecoveryDirectoryProvider INSTANCE = new 
TestDummyLocalDirectoryProvider();
 
        public static LocalRecoveryConfig disabled() {
-               return new 
LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, INSTANCE);
+               return new LocalRecoveryConfig(false, INSTANCE);
        }
 
        public static class TestDummyLocalDirectoryProvider implements 
LocalRecoveryDirectoryProvider {

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index 2116c2f..e88f9da 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.testutils.category.LegacyAndNew;
 import org.apache.flink.util.TestLogger;
@@ -102,7 +101,7 @@ public class NetworkBufferCalculationTest extends 
TestLogger {
                        InetAddress.getLoopbackAddress(),
                        new String[] {},
                        new String[] {},
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       false,
                        networkConfig,
                        QueryableStateConfiguration.disabled(),
                        1,

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 885d99f..a740bff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -50,7 +50,6 @@ import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -138,7 +137,7 @@ public class TaskExecutorITCase extends TestLogger {
                        new File[]{new 
File(System.getProperty("java.io.tmpdir"), "localRecovery")};
 
                final TaskExecutorLocalStateStoresManager taskStateManager = 
new TaskExecutorLocalStateStoresManager(
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       false,
                        taskExecutorLocalStateRootDirs,
                        rpcService.getExecutor());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 465619e..7dedb9a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -59,9 +59,9 @@ import 
org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
-import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -75,7 +75,6 @@ import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -236,7 +235,7 @@ public class TaskExecutorTest extends TestLogger {
                        CompletableFuture.completedFuture(new 
JMTMRegistrationSuccess(jmResourceId)));
 
                TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       false,
                        new File[]{tmp.newFolder()},
                        Executors.directExecutor());
 
@@ -325,7 +324,7 @@ public class TaskExecutorTest extends TestLogger {
                HeartbeatServices heartbeatServices = new 
HeartbeatServices(heartbeatInterval, heartbeatTimeout);
 
                TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       false,
                        new File[]{tmp.newFolder()},
                        Executors.directExecutor());
 
@@ -460,7 +459,7 @@ public class TaskExecutorTest extends TestLogger {
                );
 
                TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       false,
                        new File[]{tmp.newFolder()},
                        Executors.directExecutor());
 
@@ -545,7 +544,7 @@ public class TaskExecutorTest extends TestLogger {
                
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
 
                TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       false,
                        new File[]{tmp.newFolder()},
                        Executors.directExecutor());
 
@@ -608,7 +607,7 @@ public class TaskExecutorTest extends TestLogger {
                
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
 
                TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       false,
                        new File[]{tmp.newFolder()},
                        Executors.directExecutor());
 
@@ -729,7 +728,7 @@ public class TaskExecutorTest extends TestLogger {
                
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       false,
                        new File[]{tmp.newFolder()},
                        Executors.directExecutor());
 
@@ -832,7 +831,7 @@ public class TaskExecutorTest extends TestLogger {
                final SlotOffer slotOffer = new SlotOffer(allocationId, 0, 
ResourceProfile.UNKNOWN);
 
                TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       false,
                        new File[]{tmp.newFolder()},
                        Executors.directExecutor());
 
@@ -942,7 +941,7 @@ public class TaskExecutorTest extends TestLogger {
                rpc.registerGateway(jobManagerAddress, jobMasterGateway);
 
                TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       false,
                        new File[]{tmp.newFolder()},
                        Executors.directExecutor());
 
@@ -1074,7 +1073,7 @@ public class TaskExecutorTest extends TestLogger {
                final NetworkEnvironment networkMock = 
mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
 
                TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       false,
                        new File[]{tmp.newFolder()},
                        Executors.directExecutor());
 
@@ -1193,7 +1192,7 @@ public class TaskExecutorTest extends TestLogger {
                final JobManagerTable jobManagerTableMock = spy(new 
JobManagerTable());
 
                TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       false,
                        new File[]{tmp.newFolder()},
                        Executors.directExecutor());
 
@@ -1267,7 +1266,7 @@ public class TaskExecutorTest extends TestLogger {
                rpc.registerGateway(rmAddress, rmGateway);
 
                TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       false,
                        new File[]{tmp.newFolder()},
                        Executors.directExecutor());
 
@@ -1323,7 +1322,7 @@ public class TaskExecutorTest extends TestLogger {
                        timerService);
 
                TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
-                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       false,
                        new File[]{tmp.newFolder()},
                        Executors.directExecutor());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 581d8ed..8289930 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -46,7 +46,6 @@ import 
org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -162,7 +161,7 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
                        network.start();
 
                        TaskExecutorLocalStateStoresManager storesManager = new 
TaskExecutorLocalStateStoresManager(
-                               LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                               false,
                                ioManager.getSpillingDirectories(),
                                Executors.directExecutor());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 90d0fc6..0ec2ef0 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1663,9 +1663,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        final 
SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier =
 
-                               isWithLocalRecovery(
-                                       checkpointOptions.getCheckpointType(),
-                                       
localRecoveryConfig.getLocalRecoveryMode()) ?
+                               localRecoveryConfig.isLocalRecoveryEnabled() &&
+                                       (CheckpointType.SAVEPOINT != 
checkpointOptions.getCheckpointType()) ?
 
                                        () -> 
CheckpointStreamWithResultProvider.createDuplicatingStream(
                                                checkpointId,
@@ -1745,14 +1744,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                primaryStreamFactory, Thread.currentThread(), 
(System.currentTimeMillis() - startTime));
                        return AsyncStoppableTaskWithCallback.from(ioCallable);
                }
-
-               private boolean isWithLocalRecovery(
-                       CheckpointType checkpointType,
-                       LocalRecoveryConfig.LocalRecoveryMode recoveryMode) {
-                       // we use local recovery when it is activated and we 
are not taking a savepoint.
-                       return 
LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == recoveryMode
-                               && CheckpointType.SAVEPOINT != checkpointType;
-               }
        }
 
        private class IncrementalSnapshotStrategy implements 
SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
@@ -1792,7 +1783,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        SnapshotDirectory snapshotDirectory;
 
-                       if 
(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == 
localRecoveryConfig.getLocalRecoveryMode()) {
+                       if (localRecoveryConfig.isLocalRecoveryEnabled()) {
                                // create a "permanent" snapshot directory for 
local recovery.
                                LocalRecoveryDirectoryProvider 
directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
                                File directory = 
directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
@@ -2299,7 +2290,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        CheckpointStreamWithResultProvider 
streamWithResultProvider =
 
-                               
LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == 
localRecoveryConfig.getLocalRecoveryMode() ?
+                               localRecoveryConfig.isLocalRecoveryEnabled() ?
 
                                        
CheckpointStreamWithResultProvider.createDuplicatingStream(
                                                checkpointId,

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index b2b568e..6d011a3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -146,9 +146,7 @@ public class StreamOperatorSnapshotRestoreTest extends 
TestLogger {
                        new 
LocalRecoveryDirectoryProviderImpl(temporaryFolder.newFolder(), jobID, 
jobVertexID, subtaskIdx);
 
                LocalRecoveryConfig localRecoveryConfig =
-                       mode != ONLY_JM_RECOVERY ?
-                               new 
LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, 
directoryProvider) :
-                               new 
LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, 
directoryProvider);
+                       new LocalRecoveryConfig(mode != ONLY_JM_RECOVERY, 
directoryProvider);
 
                MockEnvironment mockEnvironment = new MockEnvironment(
                        jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index e35f97c..bc864a2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -177,9 +177,7 @@ public class LocalStateForwardingTest extends TestLogger {
                        jobVertexID,
                        subtaskIdx);
 
-               LocalRecoveryConfig localRecoveryConfig = new 
LocalRecoveryConfig(
-                       LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED,
-                       directoryProvider);
+               LocalRecoveryConfig localRecoveryConfig = new 
LocalRecoveryConfig(true, directoryProvider);
 
                TaskLocalStateStore taskLocalStateStore =
                        new TaskLocalStateStoreImpl(jobID, allocationID, 
jobVertexID, subtaskIdx, localRecoveryConfig, executor) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
index 13040c9..4e454d7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
@@ -28,7 +28,6 @@ import org.junit.rules.TestName;
 
 import java.io.IOException;
 
-import static 
org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode;
 import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum;
 
 /**
@@ -40,14 +39,14 @@ import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo
 public abstract class AbstractLocalRecoveryITCase extends TestLogger {
 
        private final StateBackendEnum backendEnum;
-       private final LocalRecoveryMode recoveryMode;
+       private final boolean localRecoveryEnabled;
 
        @Rule
        public TestName testName = new TestName();
 
-       AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, 
LocalRecoveryMode recoveryMode) {
+       AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, boolean 
localRecoveryEnabled) {
                this.backendEnum = backendEnum;
-               this.recoveryMode = recoveryMode;
+               this.localRecoveryEnabled = localRecoveryEnabled;
        }
 
        @Test
@@ -64,9 +63,9 @@ public abstract class AbstractLocalRecoveryITCase extends 
TestLogger {
                                protected Configuration createClusterConfig() 
throws IOException {
                                        Configuration config = 
super.createClusterConfig();
 
-                                       config.setString(
+                                       config.setBoolean(
                                                
CheckpointingOptions.LOCAL_RECOVERY,
-                                               recoveryMode.toString());
+                                               localRecoveryEnabled);
 
                                        return config;
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
index 2c0c294..6749366 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.checkpointing;
 
-import static 
org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED;
 import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
 
 /**
@@ -26,8 +25,6 @@ import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo
  */
 public class LocalRecoveryHeapITCase extends AbstractLocalRecoveryITCase {
        public LocalRecoveryHeapITCase() {
-               super(
-                       FILE_ASYNC,
-                       ENABLE_FILE_BASED);
+               super(FILE_ASYNC, true);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
index 16bbbfc..2d12ae2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.checkpointing;
 
-import static 
org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED;
 import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
 
 /**
@@ -26,8 +25,6 @@ import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo
  */
 public class LocalRecoveryRocksDBFullITCase extends 
AbstractLocalRecoveryITCase {
        public LocalRecoveryRocksDBFullITCase() {
-               super(
-                       ROCKSDB_FULLY_ASYNC,
-                       ENABLE_FILE_BASED);
+               super(ROCKSDB_FULLY_ASYNC, true);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
index fa8e139..718d4a3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.checkpointing;
 
-import static 
org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED;
 import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
 
 /**
@@ -26,8 +25,6 @@ import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo
  */
 public class LocalRecoveryRocksDBIncrementalITCase extends 
AbstractLocalRecoveryITCase {
        public LocalRecoveryRocksDBIncrementalITCase() {
-               super(
-                       ROCKSDB_INCREMENTAL_ZK,
-                       ENABLE_FILE_BASED);
+               super(ROCKSDB_INCREMENTAL_ZK, true);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 6f5bbac..aebaa63 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -253,12 +252,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
 
                config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
                config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir.toURI().toString());
-
-               if (localRecovery) {
-                       config.setString(
-                               CheckpointingOptions.LOCAL_RECOVERY,
-                               
LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.toString());
-               }
+               config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, 
localRecovery);
 
                // ZooKeeper recovery mode?
                if (zooKeeperQuorum != null) {

Reply via email to