[FLINK-9355][checkpointing] Simplify configuration of local recovery to a simple on/off switch
This closes #6006. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f422598 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f422598 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f422598 Branch: refs/heads/master Commit: 7f4225987a690e85284bb356dd5e63a996f136d0 Parents: 1451806 Author: Stefan Richter <[email protected]> Authored: Mon May 14 11:14:47 2018 +0200 Committer: Stefan Richter <[email protected]> Committed: Mon May 14 17:50:52 2018 +0200 ---------------------------------------------------------------------- .../generated/checkpointing_configuration.html | 2 +- .../configuration/CheckpointingOptions.java | 6 +-- .../runtime/state/LocalRecoveryConfig.java | 52 +++----------------- .../TaskExecutorLocalStateStoresManager.java | 12 ++--- .../state/heap/HeapKeyedStateBackend.java | 3 +- .../taskexecutor/TaskManagerServices.java | 8 +-- .../TaskManagerServicesConfiguration.java | 17 ++++--- ...TaskExecutorLocalStateStoresManagerTest.java | 16 +++--- .../state/TaskLocalStateStoreImplTest.java | 3 +- .../runtime/state/TaskStateManagerImplTest.java | 6 +-- .../runtime/state/TestLocalRecoveryConfig.java | 2 +- .../NetworkBufferCalculationTest.java | 3 +- .../taskexecutor/TaskExecutorITCase.java | 3 +- .../runtime/taskexecutor/TaskExecutorTest.java | 27 +++++----- ...askManagerComponentsStartupShutdownTest.java | 3 +- .../state/RocksDBKeyedStateBackend.java | 17 ++----- .../StreamOperatorSnapshotRestoreTest.java | 4 +- .../runtime/tasks/LocalStateForwardingTest.java | 4 +- .../AbstractLocalRecoveryITCase.java | 11 ++--- .../checkpointing/LocalRecoveryHeapITCase.java | 5 +- .../LocalRecoveryRocksDBFullITCase.java | 5 +- .../LocalRecoveryRocksDBIncrementalITCase.java | 5 +- .../ResumeCheckpointManuallyITCase.java | 8 +-- 23 files changed, 72 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/docs/_includes/generated/checkpointing_configuration.html ---------------------------------------------------------------------- diff --git a/docs/_includes/generated/checkpointing_configuration.html b/docs/_includes/generated/checkpointing_configuration.html index 8b4233f..2894c16 100644 --- a/docs/_includes/generated/checkpointing_configuration.html +++ b/docs/_includes/generated/checkpointing_configuration.html @@ -29,7 +29,7 @@ </tr> <tr> <td><h5>state.backend.local-recovery</h5></td> - <td style="word-wrap: break-word;">"DISABLED"</td> + <td style="word-wrap: break-word;">false</td> <td></td> </tr> <tr> http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java index c6af7dd..596e8dd 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java @@ -67,11 +67,11 @@ public class CheckpointingOptions { " this option."); /** - * This option configures local recovery for this state backend. + * This option configures local recovery for this state backend. By default, local recovery is deactivated. */ - public static final ConfigOption<String> LOCAL_RECOVERY = ConfigOptions + public static final ConfigOption<Boolean> LOCAL_RECOVERY = ConfigOptions .key("state.backend.local-recovery") - .defaultValue("DISABLED"); + .defaultValue(false); /** * The config parameter defining the root directories for storing file-based state for local recovery. http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java index c97fa0b..fc15f5d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java @@ -18,11 +18,6 @@ package org.apache.flink.runtime.state; -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.Configuration; - -import org.slf4j.LoggerFactory; - import javax.annotation.Nonnull; /** @@ -31,57 +26,22 @@ import javax.annotation.Nonnull; */ public class LocalRecoveryConfig { - /** - * Enum over modes of local recovery: - * <p><ul> - * <li>DISABLED: disables local recovery. - * <li>ENABLE_FILE_BASED: enables local recovery in a variant that is based on local files. - * </ul> - */ - public enum LocalRecoveryMode { - DISABLED, - ENABLE_FILE_BASED; - - /** - * Extracts the {@link LocalRecoveryMode} from the given configuration. Defaults to LocalRecoveryMode.DISABLED - * if no configuration value is specified or parsing the value resulted in an exception. - * - * @param configuration the configuration that specifies the value for the local recovery mode. - * @return the local recovery mode as found in the config, or LocalRecoveryMode.DISABLED if no mode was - * configured or the specified mode could not be parsed. - */ - @Nonnull - public static LocalRecoveryMode fromConfig(@Nonnull Configuration configuration) { - String localRecoveryConfString = configuration.getString(CheckpointingOptions.LOCAL_RECOVERY); - try { - return LocalRecoveryConfig.LocalRecoveryMode.valueOf(localRecoveryConfString); - } catch (IllegalArgumentException ex) { - LoggerFactory.getLogger(LocalRecoveryConfig.class).warn( - "Exception while parsing configuration of local recovery mode. Local recovery will be disabled.", - ex); - return LocalRecoveryConfig.LocalRecoveryMode.DISABLED; - } - } - } - /** The local recovery mode. */ - @Nonnull - private final LocalRecoveryMode localRecoveryMode; + private final boolean localRecoveryEnabled; /** Encapsulates the root directories and the subtask-specific path. */ @Nonnull private final LocalRecoveryDirectoryProvider localStateDirectories; public LocalRecoveryConfig( - @Nonnull LocalRecoveryMode localRecoveryMode, + boolean localRecoveryEnabled, @Nonnull LocalRecoveryDirectoryProvider directoryProvider) { - this.localRecoveryMode = localRecoveryMode; + this.localRecoveryEnabled = localRecoveryEnabled; this.localStateDirectories = directoryProvider; } - @Nonnull - public LocalRecoveryMode getLocalRecoveryMode() { - return localRecoveryMode; + public boolean isLocalRecoveryEnabled() { + return localRecoveryEnabled; } @Nonnull @@ -92,7 +52,7 @@ public class LocalRecoveryConfig { @Override public String toString() { return "LocalRecoveryConfig{" + - "localRecoveryMode=" + localRecoveryMode + + "localRecoveryMode=" + localRecoveryEnabled + ", localStateDirectories=" + localStateDirectories + '}'; } http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java index 518ad81..4919f80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java @@ -54,7 +54,7 @@ public class TaskExecutorLocalStateStoresManager { private final Map<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>> taskStateStoresByAllocationID; /** The configured mode for local recovery on this task manager. */ - private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode; + private final boolean localRecoveryEnabled; /** This is the root directory for all local state of this task manager / executor. */ private final File[] localStateRootDirectories; @@ -71,12 +71,12 @@ public class TaskExecutorLocalStateStoresManager { private boolean closed; public TaskExecutorLocalStateStoresManager( - @Nonnull LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode, + boolean localRecoveryEnabled, @Nonnull File[] localStateRootDirectories, @Nonnull Executor discardExecutor) throws IOException { this.taskStateStoresByAllocationID = new HashMap<>(); - this.localRecoveryMode = localRecoveryMode; + this.localRecoveryEnabled = localRecoveryEnabled; this.localStateRootDirectories = localStateRootDirectories; this.discardExecutor = discardExecutor; this.lock = new Object(); @@ -140,7 +140,7 @@ public class TaskExecutorLocalStateStoresManager { subtaskIndex); LocalRecoveryConfig localRecoveryConfig = - new LocalRecoveryConfig(localRecoveryMode, directoryProvider); + new LocalRecoveryConfig(localRecoveryEnabled, directoryProvider); taskLocalStateStore = new TaskLocalStateStoreImpl( jobId, @@ -217,8 +217,8 @@ public class TaskExecutorLocalStateStoresManager { } @VisibleForTesting - public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() { - return localRecoveryMode; + boolean isLocalRecoveryEnabled() { + return localRecoveryEnabled; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index a02edb0..ab91ee1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -622,8 +622,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier = - LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.equals( - localRecoveryConfig.getLocalRecoveryMode()) ? + localRecoveryConfig.isLocalRecoveryEnabled() ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream( checkpointId, http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index ad5e22d..ad19a57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -42,7 +42,6 @@ import org.apache.flink.runtime.query.KvStateClientProxy; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.KvStateServer; import org.apache.flink.runtime.query.QueryableStateUtils; -import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -255,7 +254,6 @@ public class TaskManagerServices { final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); - LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode = taskManagerServicesConfiguration.getLocalRecoveryMode(); final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories(); @@ -265,8 +263,10 @@ public class TaskManagerServices { stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT); } - final TaskExecutorLocalStateStoresManager taskStateManager = - new TaskExecutorLocalStateStoresManager(localRecoveryMode, stateRootDirectoryFiles, taskIOExecutor); + final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager( + taskManagerServicesConfiguration.isLocalRecoveryEnabled(), + stateRootDirectoryFiles, + taskIOExecutor); return new TaskManagerServices( taskManagerLocation, http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index b80320c..bf1494e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; @@ -29,7 +30,6 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.util.MathUtils; import org.apache.flink.util.NetUtils; @@ -78,13 +78,13 @@ public class TaskManagerServicesConfiguration { private final long timerServiceShutdownTimeout; - private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode; + private final boolean localRecoveryEnabled; public TaskManagerServicesConfiguration( InetAddress taskManagerAddress, String[] tmpDirPaths, String[] localRecoveryStateRootDirectories, - LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode, + boolean localRecoveryEnabled, NetworkEnvironmentConfiguration networkConfig, QueryableStateConfiguration queryableStateConfig, int numberOfSlots, @@ -97,7 +97,7 @@ public class TaskManagerServicesConfiguration { this.taskManagerAddress = checkNotNull(taskManagerAddress); this.tmpDirPaths = checkNotNull(tmpDirPaths); this.localRecoveryStateRootDirectories = checkNotNull(localRecoveryStateRootDirectories); - this.localRecoveryMode = checkNotNull(localRecoveryMode); + this.localRecoveryEnabled = checkNotNull(localRecoveryEnabled); this.networkConfig = checkNotNull(networkConfig); this.queryableStateConfig = checkNotNull(queryableStateConfig); this.numberOfSlots = checkNotNull(numberOfSlots); @@ -128,8 +128,8 @@ public class TaskManagerServicesConfiguration { return localRecoveryStateRootDirectories; } - public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() { - return localRecoveryMode; + public boolean isLocalRecoveryEnabled() { + return localRecoveryEnabled; } public NetworkEnvironmentConfiguration getNetworkConfig() { @@ -209,8 +209,9 @@ public class TaskManagerServicesConfiguration { localStateRootDir = tmpDirs; } - LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode = - LocalRecoveryConfig.LocalRecoveryMode.fromConfig(configuration); + boolean localRecoveryMode = configuration.getBoolean( + CheckpointingOptions.LOCAL_RECOVERY.key(), + CheckpointingOptions.LOCAL_RECOVERY.defaultValue()); final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration( configuration, http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java index 2e9b107..97539bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java @@ -62,7 +62,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { config.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS, rootDirString); // test configuration of the local state mode - config.setString(CheckpointingOptions.LOCAL_RECOVERY, "ENABLE_FILE_BASED"); + config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true); final ResourceID tmResourceID = ResourceID.generate(); @@ -88,9 +88,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { } // verify local recovery mode - Assert.assertEquals( - LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, - taskStateManager.getLocalRecoveryMode()); + Assert.assertTrue(taskStateManager.isLocalRecoveryEnabled()); Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT); for (File rootDirectory : rootDirectories) { @@ -130,9 +128,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { localStateRootDirectories[i]); } - Assert.assertEquals( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, - taskStateManager.getLocalRecoveryMode()); + Assert.assertFalse(taskStateManager.isLocalRecoveryEnabled()); } /** @@ -150,7 +146,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { File[] rootDirs = {temporaryFolder.newFolder(), temporaryFolder.newFolder(), temporaryFolder.newFolder()}; TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, + true, rootDirs, Executors.directExecutor()); @@ -187,8 +183,8 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { // test that local recovery mode is forwarded to the created store Assert.assertEquals( - storesManager.getLocalRecoveryMode(), - taskLocalStateStore.getLocalRecoveryConfig().getLocalRecoveryMode()); + storesManager.isLocalRecoveryEnabled(), + taskLocalStateStore.getLocalRecoveryConfig().isLocalRecoveryEnabled()); Assert.assertTrue(testFile.exists()); http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java index 618320e..7531783 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java @@ -64,8 +64,7 @@ public class TaskLocalStateStoreImplTest { LocalRecoveryDirectoryProviderImpl directoryProvider = new LocalRecoveryDirectoryProviderImpl(allocationBaseDirs, jobID, jobVertexID, subtaskIdx); - LocalRecoveryConfig localRecoveryConfig = - new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, directoryProvider); + LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(false, directoryProvider); this.taskLocalStateStore = new TaskLocalStateStoreImpl( jobID, http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java index f58f3f4..71038c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java @@ -193,7 +193,7 @@ public class TaskStateManagerImplTest extends TestLogger { new LocalRecoveryDirectoryProviderImpl(allocBaseDirs, jobID, jobVertexID, 0); LocalRecoveryConfig localRecoveryConfig = - new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, directoryProvider); + new LocalRecoveryConfig(true, directoryProvider); TaskLocalStateStore taskLocalStateStore = new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, 13, localRecoveryConfig, directExecutor); @@ -220,8 +220,8 @@ public class TaskStateManagerImplTest extends TestLogger { } Assert.assertEquals( - localRecoveryConfFromTaskLocalStateStore.getLocalRecoveryMode(), - localRecoveryConfFromTaskStateManager.getLocalRecoveryMode()); + localRecoveryConfFromTaskLocalStateStore.isLocalRecoveryEnabled(), + localRecoveryConfFromTaskStateManager.isLocalRecoveryEnabled()); } finally { tmpFolder.delete(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java index 7801720..58affc5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java @@ -28,7 +28,7 @@ public class TestLocalRecoveryConfig { private static final LocalRecoveryDirectoryProvider INSTANCE = new TestDummyLocalDirectoryProvider(); public static LocalRecoveryConfig disabled() { - return new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, INSTANCE); + return new LocalRecoveryConfig(false, INSTANCE); } public static class TestDummyLocalDirectoryProvider implements LocalRecoveryDirectoryProvider { http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java index 2116c2f..e88f9da 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.testutils.category.LegacyAndNew; import org.apache.flink.util.TestLogger; @@ -102,7 +101,7 @@ public class NetworkBufferCalculationTest extends TestLogger { InetAddress.getLoopbackAddress(), new String[] {}, new String[] {}, - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, networkConfig, QueryableStateConfiguration.disabled(), 1, http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 885d99f..a740bff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -50,7 +50,6 @@ import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; -import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; @@ -138,7 +137,7 @@ public class TaskExecutorITCase extends TestLogger { new File[]{new File(System.getProperty("java.io.tmpdir"), "localRecovery")}; final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, taskExecutorLocalStateRootDirs, rpcService.getExecutor()); http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 465619e..7dedb9a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -59,9 +59,9 @@ import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; -import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; @@ -75,7 +75,6 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; -import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; @@ -236,7 +235,7 @@ public class TaskExecutorTest extends TestLogger { CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId))); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -325,7 +324,7 @@ public class TaskExecutorTest extends TestLogger { HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -460,7 +459,7 @@ public class TaskExecutorTest extends TestLogger { ); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -545,7 +544,7 @@ public class TaskExecutorTest extends TestLogger { when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -608,7 +607,7 @@ public class TaskExecutorTest extends TestLogger { when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -729,7 +728,7 @@ public class TaskExecutorTest extends TestLogger { when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -832,7 +831,7 @@ public class TaskExecutorTest extends TestLogger { final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -942,7 +941,7 @@ public class TaskExecutorTest extends TestLogger { rpc.registerGateway(jobManagerAddress, jobMasterGateway); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -1074,7 +1073,7 @@ public class TaskExecutorTest extends TestLogger { final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -1193,7 +1192,7 @@ public class TaskExecutorTest extends TestLogger { final JobManagerTable jobManagerTableMock = spy(new JobManagerTable()); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -1267,7 +1266,7 @@ public class TaskExecutorTest extends TestLogger { rpc.registerGateway(rmAddress, rmGateway); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -1323,7 +1322,7 @@ public class TaskExecutorTest extends TestLogger { timerService); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 581d8ed..8289930 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -46,7 +46,6 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -162,7 +161,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { network.start(); TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, ioManager.getSpillingDirectories(), Executors.directExecutor()); http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 90d0fc6..0ec2ef0 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -1663,9 +1663,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { final SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier = - isWithLocalRecovery( - checkpointOptions.getCheckpointType(), - localRecoveryConfig.getLocalRecoveryMode()) ? + localRecoveryConfig.isLocalRecoveryEnabled() && + (CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream( checkpointId, @@ -1745,14 +1744,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); return AsyncStoppableTaskWithCallback.from(ioCallable); } - - private boolean isWithLocalRecovery( - CheckpointType checkpointType, - LocalRecoveryConfig.LocalRecoveryMode recoveryMode) { - // we use local recovery when it is activated and we are not taking a savepoint. - return LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == recoveryMode - && CheckpointType.SAVEPOINT != checkpointType; - } } private class IncrementalSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> { @@ -1792,7 +1783,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { SnapshotDirectory snapshotDirectory; - if (LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == localRecoveryConfig.getLocalRecoveryMode()) { + if (localRecoveryConfig.isLocalRecoveryEnabled()) { // create a "permanent" snapshot directory for local recovery. LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider(); File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId); @@ -2299,7 +2290,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { CheckpointStreamWithResultProvider streamWithResultProvider = - LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == localRecoveryConfig.getLocalRecoveryMode() ? + localRecoveryConfig.isLocalRecoveryEnabled() ? CheckpointStreamWithResultProvider.createDuplicatingStream( checkpointId, http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java index b2b568e..6d011a3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java @@ -146,9 +146,7 @@ public class StreamOperatorSnapshotRestoreTest extends TestLogger { new LocalRecoveryDirectoryProviderImpl(temporaryFolder.newFolder(), jobID, jobVertexID, subtaskIdx); LocalRecoveryConfig localRecoveryConfig = - mode != ONLY_JM_RECOVERY ? - new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, directoryProvider) : - new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, directoryProvider); + new LocalRecoveryConfig(mode != ONLY_JM_RECOVERY, directoryProvider); MockEnvironment mockEnvironment = new MockEnvironment( jobID, http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java index e35f97c..bc864a2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java @@ -177,9 +177,7 @@ public class LocalStateForwardingTest extends TestLogger { jobVertexID, subtaskIdx); - LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig( - LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, - directoryProvider); + LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(true, directoryProvider); TaskLocalStateStore taskLocalStateStore = new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, subtaskIdx, localRecoveryConfig, executor) { http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java index 13040c9..4e454d7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java @@ -28,7 +28,6 @@ import org.junit.rules.TestName; import java.io.IOException; -import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode; import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum; /** @@ -40,14 +39,14 @@ import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo public abstract class AbstractLocalRecoveryITCase extends TestLogger { private final StateBackendEnum backendEnum; - private final LocalRecoveryMode recoveryMode; + private final boolean localRecoveryEnabled; @Rule public TestName testName = new TestName(); - AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, LocalRecoveryMode recoveryMode) { + AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, boolean localRecoveryEnabled) { this.backendEnum = backendEnum; - this.recoveryMode = recoveryMode; + this.localRecoveryEnabled = localRecoveryEnabled; } @Test @@ -64,9 +63,9 @@ public abstract class AbstractLocalRecoveryITCase extends TestLogger { protected Configuration createClusterConfig() throws IOException { Configuration config = super.createClusterConfig(); - config.setString( + config.setBoolean( CheckpointingOptions.LOCAL_RECOVERY, - recoveryMode.toString()); + localRecoveryEnabled); return config; } http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java index 2c0c294..6749366 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.test.checkpointing; -import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED; import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC; /** @@ -26,8 +25,6 @@ import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo */ public class LocalRecoveryHeapITCase extends AbstractLocalRecoveryITCase { public LocalRecoveryHeapITCase() { - super( - FILE_ASYNC, - ENABLE_FILE_BASED); + super(FILE_ASYNC, true); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java index 16bbbfc..2d12ae2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.test.checkpointing; -import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED; import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC; /** @@ -26,8 +25,6 @@ import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo */ public class LocalRecoveryRocksDBFullITCase extends AbstractLocalRecoveryITCase { public LocalRecoveryRocksDBFullITCase() { - super( - ROCKSDB_FULLY_ASYNC, - ENABLE_FILE_BASED); + super(ROCKSDB_FULLY_ASYNC, true); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java index fa8e139..718d4a3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.test.checkpointing; -import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED; import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK; /** @@ -26,8 +25,6 @@ import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo */ public class LocalRecoveryRocksDBIncrementalITCase extends AbstractLocalRecoveryITCase { public LocalRecoveryRocksDBIncrementalITCase() { - super( - ROCKSDB_INCREMENTAL_ZK, - ENABLE_FILE_BASED); + super(ROCKSDB_INCREMENTAL_ZK, true); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java index 6f5bbac..aebaa63 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java @@ -28,7 +28,6 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; -import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -253,12 +252,7 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); - - if (localRecovery) { - config.setString( - CheckpointingOptions.LOCAL_RECOVERY, - LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.toString()); - } + config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, localRecovery); // ZooKeeper recovery mode? if (zooKeeperQuorum != null) {
