[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365618#comment-16365618
 ] 

ASF GitHub Bot commented on FLINK-8360:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168490322
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 ---
    @@ -482,40 +480,40 @@ public void 
testSharedIncrementalStateDeRegistration() throws Exception {
        @Test
        public void testLocalRecoveryConfigurationForwarding() throws Exception 
{
     
    -           RocksDBStateBackend stateBackend = getStateBackend();
    -           
Assert.assertEquals(RocksDBStateBackend.LocalRecoveryMode.DISABLED, 
stateBackend.getLocalRecoveryMode());
    -           
stateBackend.setLocalRecoveryMode(RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED);
    -           
Assert.assertEquals(RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED, 
stateBackend.getLocalRecoveryMode());
    -
    -           DummyEnvironment environment = new DummyEnvironment();
    -           File tmpDir = tempFolder.newFolder();
    -           TestTaskStateManager taskStateManager = new 
TestTaskStateManager();
    -           taskStateManager.setSubtaskLocalStateBaseDirectory(tmpDir);
    -           environment.setTaskStateManager(taskStateManager);
    -
    -           RocksDBKeyedStateBackend<Integer> keyedBackend =
    -                   (RocksDBKeyedStateBackend<Integer>) 
stateBackend.createKeyedStateBackend(
    -                           environment,
    -                           new JobID(),
    -                           "test",
    -                           IntSerializer.INSTANCE,
    -                           1,
    -                           new KeyGroupRange(0, 0),
    -                           null);
    -
    -           try {
    -                   RocksDBStateBackend.LocalRecoveryConfig 
localRecoveryConfig = keyedBackend.getLocalRecoveryConfig();
    -                   Assert.assertEquals(
    -                           
RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED,
    -                           localRecoveryConfig.getLocalRecoveryMode());
    -
    -                   Assert.assertEquals(
    -                           tmpDir,
    -                           localRecoveryConfig.getLocalStateDirectory());
    -           } finally {
    -                   IOUtils.closeQuietly(keyedBackend);
    -                   keyedBackend.dispose();
    -           }
    +//         RocksDBStateBackend stateBackend = getStateBackend();
    +//         
Assert.assertEquals(RocksDBStateBackend.LocalRecoveryMode.DISABLED, 
stateBackend.getLocalRecoveryMode());
    +//         
stateBackend.setLocalRecoveryMode(RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED);
    +//         
Assert.assertEquals(RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED, 
stateBackend.getLocalRecoveryMode());
    +//
    +//         DummyEnvironment environment = new DummyEnvironment();
    +//         File tmpDir = tempFolder.newFolder();
    +//         TestTaskStateManager taskStateManager = new 
TestTaskStateManager();
    +//         taskStateManager.setLocalRecoveryDirectoryProvider(tmpDir);
    +//         environment.setTaskStateManager(taskStateManager);
    +//
    +//         RocksDBKeyedStateBackend<Integer> keyedBackend =
    +//                 (RocksDBKeyedStateBackend<Integer>) 
stateBackend.createKeyedStateBackend(
    +//                         environment,
    +//                         new JobID(),
    +//                         "test",
    +//                         IntSerializer.INSTANCE,
    +//                         1,
    +//                         new KeyGroupRange(0, 0),
    +//                         null);
    +//
    +//         try {
    +//                 RocksDBStateBackend.LocalRecoveryConfig 
localRecoveryConfig = keyedBackend.getLocalRecoveryConfig();
    +//                 Assert.assertEquals(
    +//                         
RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED,
    +//                         localRecoveryConfig.getLocalRecoveryMode());
    +//
    +//                 Assert.assertEquals(
    +//                         tmpDir,
    +//                         localRecoveryConfig.getLocalStateDirectory());
    +//         } finally {
    +//                 IOUtils.closeQuietly(keyedBackend);
    +//                 keyedBackend.dispose();
    +//         }
    --- End diff --
    
    👍 


> Implement task-local state recovery
> -----------------------------------
>
>                 Key: FLINK-8360
>                 URL: https://issues.apache.org/jira/browse/FLINK-8360
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to