[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365752#comment-16365752
 ] 

ASF GitHub Bot commented on FLINK-8360:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168510973
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 ---
    @@ -223,155 +223,110 @@ public StreamOperatorStateContext 
streamOperatorStateContext(
     
        protected OperatorStateBackend operatorStateBackend(
                String operatorIdentifierText,
    -           OperatorSubtaskState operatorSubtaskStateFromJobManager,
    +           PrioritizedOperatorSubtaskState 
prioritizedOperatorSubtaskStates,
                CloseableRegistry backendCloseableRegistry) throws Exception {
     
    -           //TODO search in local state for a local recovery opportunity.
    +           BackendRestorerProcedure<OperatorStateBackend, 
OperatorStateHandle> backendRestorer =
    +                   new BackendRestorerProcedure<>(
    +                           () -> 
stateBackend.createOperatorStateBackend(environment, operatorIdentifierText),
    +                           backendCloseableRegistry);
     
    -           return createOperatorStateBackendFromJobManagerState(
    -                   operatorIdentifierText,
    -                   operatorSubtaskStateFromJobManager,
    -                   backendCloseableRegistry);
    +           return 
backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());
        }
     
        protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(
                TypeSerializer<K> keySerializer,
                String operatorIdentifierText,
    -           OperatorSubtaskState operatorSubtaskStateFromJobManager,
    +           PrioritizedOperatorSubtaskState 
prioritizedOperatorSubtaskStates,
                CloseableRegistry backendCloseableRegistry) throws Exception {
     
                if (keySerializer == null) {
                        return null;
                }
     
    -           //TODO search in local state for a local recovery opportunity.
    +           TaskInfo taskInfo = environment.getTaskInfo();
    +
    +           final KeyGroupRange keyGroupRange = 
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
    +                   taskInfo.getMaxNumberOfParallelSubtasks(),
    +                   taskInfo.getNumberOfParallelSubtasks(),
    +                   taskInfo.getIndexOfThisSubtask());
     
    -           return createKeyedStatedBackendFromJobManagerState(
    -                   keySerializer,
    -                   operatorIdentifierText,
    -                   operatorSubtaskStateFromJobManager,
    -                   backendCloseableRegistry);
    +           BackendRestorerProcedure<AbstractKeyedStateBackend<K>, 
KeyedStateHandle> backendRestorer =
    +                   new BackendRestorerProcedure<>(
    +                           () -> stateBackend.createKeyedStateBackend(
    +                                   environment,
    +                                   environment.getJobID(),
    +                                   operatorIdentifierText,
    +                                   keySerializer,
    +                                   
taskInfo.getMaxNumberOfParallelSubtasks(),
    +                                   keyGroupRange,
    +                                   environment.getTaskKvStateRegistry()),
    +                           backendCloseableRegistry);
    +
    +           return 
backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
        }
     
        protected CloseableIterable<StatePartitionStreamProvider> 
rawOperatorStateInputs(
    -           OperatorSubtaskState operatorSubtaskStateFromJobManager) {
    +           Iterator<StateObjectCollection<OperatorStateHandle>> 
restoreStateAlternatives) {
     
    -           if (operatorSubtaskStateFromJobManager != null) {
    +           if (restoreStateAlternatives.hasNext()) {
     
                        final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
     
    -                   Collection<OperatorStateHandle> rawOperatorState =
    -                           
operatorSubtaskStateFromJobManager.getRawOperatorState();
    -
    -                   return new 
CloseableIterable<StatePartitionStreamProvider>() {
    -                           @Override
    -                           public void close() throws IOException {
    -                                   closeableRegistry.close();
    -                           }
    -
    -                           @Nonnull
    -                           @Override
    -                           public Iterator<StatePartitionStreamProvider> 
iterator() {
    -                                   return new OperatorStateStreamIterator(
    -                                           
DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
    -                                           rawOperatorState.iterator(), 
closeableRegistry);
    -                           }
    -                   };
    -           }
    -
    -           return CloseableIterable.empty();
    -   }
    -
    -   protected CloseableIterable<KeyGroupStatePartitionStreamProvider> 
rawKeyedStateInputs(
    -           OperatorSubtaskState operatorSubtaskStateFromJobManager) {
    +                   Collection<OperatorStateHandle> rawOperatorState = 
restoreStateAlternatives.next();
    +                   // TODO currently this does not support local state 
recovery, so we expect there is only one handle.
    +                   
Preconditions.checkState(!restoreStateAlternatives.hasNext());
    --- End diff --
    
    Maybe we could add a proper failure message here.


> Implement task-local state recovery
> -----------------------------------
>
>                 Key: FLINK-8360
>                 URL: https://issues.apache.org/jira/browse/FLINK-8360
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to