Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168717057
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 ---
    @@ -223,155 +223,110 @@ public StreamOperatorStateContext 
streamOperatorStateContext(
     
        protected OperatorStateBackend operatorStateBackend(
                String operatorIdentifierText,
    -           OperatorSubtaskState operatorSubtaskStateFromJobManager,
    +           PrioritizedOperatorSubtaskState 
prioritizedOperatorSubtaskStates,
                CloseableRegistry backendCloseableRegistry) throws Exception {
     
    -           //TODO search in local state for a local recovery opportunity.
    +           BackendRestorerProcedure<OperatorStateBackend, 
OperatorStateHandle> backendRestorer =
    +                   new BackendRestorerProcedure<>(
    +                           () -> 
stateBackend.createOperatorStateBackend(environment, operatorIdentifierText),
    +                           backendCloseableRegistry);
     
    -           return createOperatorStateBackendFromJobManagerState(
    -                   operatorIdentifierText,
    -                   operatorSubtaskStateFromJobManager,
    -                   backendCloseableRegistry);
    +           return 
backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());
        }
     
        protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(
                TypeSerializer<K> keySerializer,
                String operatorIdentifierText,
    -           OperatorSubtaskState operatorSubtaskStateFromJobManager,
    +           PrioritizedOperatorSubtaskState 
prioritizedOperatorSubtaskStates,
                CloseableRegistry backendCloseableRegistry) throws Exception {
     
                if (keySerializer == null) {
                        return null;
                }
     
    -           //TODO search in local state for a local recovery opportunity.
    +           TaskInfo taskInfo = environment.getTaskInfo();
    +
    +           final KeyGroupRange keyGroupRange = 
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
    +                   taskInfo.getMaxNumberOfParallelSubtasks(),
    +                   taskInfo.getNumberOfParallelSubtasks(),
    +                   taskInfo.getIndexOfThisSubtask());
     
    -           return createKeyedStatedBackendFromJobManagerState(
    -                   keySerializer,
    -                   operatorIdentifierText,
    -                   operatorSubtaskStateFromJobManager,
    -                   backendCloseableRegistry);
    +           BackendRestorerProcedure<AbstractKeyedStateBackend<K>, 
KeyedStateHandle> backendRestorer =
    +                   new BackendRestorerProcedure<>(
    +                           () -> stateBackend.createKeyedStateBackend(
    +                                   environment,
    +                                   environment.getJobID(),
    +                                   operatorIdentifierText,
    +                                   keySerializer,
    +                                   
taskInfo.getMaxNumberOfParallelSubtasks(),
    +                                   keyGroupRange,
    +                                   environment.getTaskKvStateRegistry()),
    +                           backendCloseableRegistry);
    +
    +           return 
backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
        }
     
        protected CloseableIterable<StatePartitionStreamProvider> 
rawOperatorStateInputs(
    -           OperatorSubtaskState operatorSubtaskStateFromJobManager) {
    +           Iterator<StateObjectCollection<OperatorStateHandle>> 
restoreStateAlternatives) {
     
    -           if (operatorSubtaskStateFromJobManager != null) {
    +           if (restoreStateAlternatives.hasNext()) {
     
                        final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
     
    -                   Collection<OperatorStateHandle> rawOperatorState =
    -                           
operatorSubtaskStateFromJobManager.getRawOperatorState();
    -
    -                   return new 
CloseableIterable<StatePartitionStreamProvider>() {
    -                           @Override
    -                           public void close() throws IOException {
    -                                   closeableRegistry.close();
    -                           }
    -
    -                           @Nonnull
    -                           @Override
    -                           public Iterator<StatePartitionStreamProvider> 
iterator() {
    -                                   return new OperatorStateStreamIterator(
    -                                           
DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
    -                                           rawOperatorState.iterator(), 
closeableRegistry);
    -                           }
    -                   };
    -           }
    -
    -           return CloseableIterable.empty();
    -   }
    -
    -   protected CloseableIterable<KeyGroupStatePartitionStreamProvider> 
rawKeyedStateInputs(
    -           OperatorSubtaskState operatorSubtaskStateFromJobManager) {
    +                   Collection<OperatorStateHandle> rawOperatorState = 
restoreStateAlternatives.next();
    +                   // TODO currently this does not support local state 
recovery, so we expect there is only one handle.
    +                   
Preconditions.checkState(!restoreStateAlternatives.hasNext());
    --- End diff --
    
    👍 


---

Reply via email to