Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168510973
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
---
@@ -223,155 +223,110 @@ public StreamOperatorStateContext
streamOperatorStateContext(
protected OperatorStateBackend operatorStateBackend(
String operatorIdentifierText,
- OperatorSubtaskState operatorSubtaskStateFromJobManager,
+ PrioritizedOperatorSubtaskState
prioritizedOperatorSubtaskStates,
CloseableRegistry backendCloseableRegistry) throws Exception {
- //TODO search in local state for a local recovery opportunity.
+ BackendRestorerProcedure<OperatorStateBackend,
OperatorStateHandle> backendRestorer =
+ new BackendRestorerProcedure<>(
+ () ->
stateBackend.createOperatorStateBackend(environment, operatorIdentifierText),
+ backendCloseableRegistry);
- return createOperatorStateBackendFromJobManagerState(
- operatorIdentifierText,
- operatorSubtaskStateFromJobManager,
- backendCloseableRegistry);
+ return
backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());
}
protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(
TypeSerializer<K> keySerializer,
String operatorIdentifierText,
- OperatorSubtaskState operatorSubtaskStateFromJobManager,
+ PrioritizedOperatorSubtaskState
prioritizedOperatorSubtaskStates,
CloseableRegistry backendCloseableRegistry) throws Exception {
if (keySerializer == null) {
return null;
}
- //TODO search in local state for a local recovery opportunity.
+ TaskInfo taskInfo = environment.getTaskInfo();
+
+ final KeyGroupRange keyGroupRange =
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
+ taskInfo.getMaxNumberOfParallelSubtasks(),
+ taskInfo.getNumberOfParallelSubtasks(),
+ taskInfo.getIndexOfThisSubtask());
- return createKeyedStatedBackendFromJobManagerState(
- keySerializer,
- operatorIdentifierText,
- operatorSubtaskStateFromJobManager,
- backendCloseableRegistry);
+ BackendRestorerProcedure<AbstractKeyedStateBackend<K>,
KeyedStateHandle> backendRestorer =
+ new BackendRestorerProcedure<>(
+ () -> stateBackend.createKeyedStateBackend(
+ environment,
+ environment.getJobID(),
+ operatorIdentifierText,
+ keySerializer,
+
taskInfo.getMaxNumberOfParallelSubtasks(),
+ keyGroupRange,
+ environment.getTaskKvStateRegistry()),
+ backendCloseableRegistry);
+
+ return
backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
}
protected CloseableIterable<StatePartitionStreamProvider>
rawOperatorStateInputs(
- OperatorSubtaskState operatorSubtaskStateFromJobManager) {
+ Iterator<StateObjectCollection<OperatorStateHandle>>
restoreStateAlternatives) {
- if (operatorSubtaskStateFromJobManager != null) {
+ if (restoreStateAlternatives.hasNext()) {
final CloseableRegistry closeableRegistry = new
CloseableRegistry();
- Collection<OperatorStateHandle> rawOperatorState =
-
operatorSubtaskStateFromJobManager.getRawOperatorState();
-
- return new
CloseableIterable<StatePartitionStreamProvider>() {
- @Override
- public void close() throws IOException {
- closeableRegistry.close();
- }
-
- @Nonnull
- @Override
- public Iterator<StatePartitionStreamProvider>
iterator() {
- return new OperatorStateStreamIterator(
-
DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
- rawOperatorState.iterator(),
closeableRegistry);
- }
- };
- }
-
- return CloseableIterable.empty();
- }
-
- protected CloseableIterable<KeyGroupStatePartitionStreamProvider>
rawKeyedStateInputs(
- OperatorSubtaskState operatorSubtaskStateFromJobManager) {
+ Collection<OperatorStateHandle> rawOperatorState =
restoreStateAlternatives.next();
+ // TODO currently this does not support local state
recovery, so we expect there is only one handle.
+
Preconditions.checkState(!restoreStateAlternatives.hasNext());
--- End diff --
Maybe we could add a proper failure message here.
---