[
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365618#comment-16365618
]
ASF GitHub Bot commented on FLINK-8360:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168490322
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
---
@@ -482,40 +480,40 @@ public void
testSharedIncrementalStateDeRegistration() throws Exception {
@Test
public void testLocalRecoveryConfigurationForwarding() throws Exception
{
- RocksDBStateBackend stateBackend = getStateBackend();
-
Assert.assertEquals(RocksDBStateBackend.LocalRecoveryMode.DISABLED,
stateBackend.getLocalRecoveryMode());
-
stateBackend.setLocalRecoveryMode(RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED);
-
Assert.assertEquals(RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED,
stateBackend.getLocalRecoveryMode());
-
- DummyEnvironment environment = new DummyEnvironment();
- File tmpDir = tempFolder.newFolder();
- TestTaskStateManager taskStateManager = new
TestTaskStateManager();
- taskStateManager.setSubtaskLocalStateBaseDirectory(tmpDir);
- environment.setTaskStateManager(taskStateManager);
-
- RocksDBKeyedStateBackend<Integer> keyedBackend =
- (RocksDBKeyedStateBackend<Integer>)
stateBackend.createKeyedStateBackend(
- environment,
- new JobID(),
- "test",
- IntSerializer.INSTANCE,
- 1,
- new KeyGroupRange(0, 0),
- null);
-
- try {
- RocksDBStateBackend.LocalRecoveryConfig
localRecoveryConfig = keyedBackend.getLocalRecoveryConfig();
- Assert.assertEquals(
-
RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED,
- localRecoveryConfig.getLocalRecoveryMode());
-
- Assert.assertEquals(
- tmpDir,
- localRecoveryConfig.getLocalStateDirectory());
- } finally {
- IOUtils.closeQuietly(keyedBackend);
- keyedBackend.dispose();
- }
+// RocksDBStateBackend stateBackend = getStateBackend();
+//
Assert.assertEquals(RocksDBStateBackend.LocalRecoveryMode.DISABLED,
stateBackend.getLocalRecoveryMode());
+//
stateBackend.setLocalRecoveryMode(RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED);
+//
Assert.assertEquals(RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED,
stateBackend.getLocalRecoveryMode());
+//
+// DummyEnvironment environment = new DummyEnvironment();
+// File tmpDir = tempFolder.newFolder();
+// TestTaskStateManager taskStateManager = new
TestTaskStateManager();
+// taskStateManager.setLocalRecoveryDirectoryProvider(tmpDir);
+// environment.setTaskStateManager(taskStateManager);
+//
+// RocksDBKeyedStateBackend<Integer> keyedBackend =
+// (RocksDBKeyedStateBackend<Integer>)
stateBackend.createKeyedStateBackend(
+// environment,
+// new JobID(),
+// "test",
+// IntSerializer.INSTANCE,
+// 1,
+// new KeyGroupRange(0, 0),
+// null);
+//
+// try {
+// RocksDBStateBackend.LocalRecoveryConfig
localRecoveryConfig = keyedBackend.getLocalRecoveryConfig();
+// Assert.assertEquals(
+//
RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED,
+// localRecoveryConfig.getLocalRecoveryMode());
+//
+// Assert.assertEquals(
+// tmpDir,
+// localRecoveryConfig.getLocalStateDirectory());
+// } finally {
+// IOUtils.closeQuietly(keyedBackend);
+// keyedBackend.dispose();
+// }
--- End diff --
👍
> Implement task-local state recovery
> -----------------------------------
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main
> idea is to have a secondary, local copy of the checkpointed state, while
> there is still a primary copy in DFS that we report to the checkpoint
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available,
> to save network bandwidth. This requires that the assignment from tasks to
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and
> can easily enhance it to all other state types (e.g. operator state) later.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)