[
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365752#comment-16365752
]
ASF GitHub Bot commented on FLINK-8360:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168510973
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
---
@@ -223,155 +223,110 @@ public StreamOperatorStateContext
streamOperatorStateContext(
protected OperatorStateBackend operatorStateBackend(
String operatorIdentifierText,
- OperatorSubtaskState operatorSubtaskStateFromJobManager,
+ PrioritizedOperatorSubtaskState
prioritizedOperatorSubtaskStates,
CloseableRegistry backendCloseableRegistry) throws Exception {
- //TODO search in local state for a local recovery opportunity.
+ BackendRestorerProcedure<OperatorStateBackend,
OperatorStateHandle> backendRestorer =
+ new BackendRestorerProcedure<>(
+ () ->
stateBackend.createOperatorStateBackend(environment, operatorIdentifierText),
+ backendCloseableRegistry);
- return createOperatorStateBackendFromJobManagerState(
- operatorIdentifierText,
- operatorSubtaskStateFromJobManager,
- backendCloseableRegistry);
+ return
backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());
}
protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(
TypeSerializer<K> keySerializer,
String operatorIdentifierText,
- OperatorSubtaskState operatorSubtaskStateFromJobManager,
+ PrioritizedOperatorSubtaskState
prioritizedOperatorSubtaskStates,
CloseableRegistry backendCloseableRegistry) throws Exception {
if (keySerializer == null) {
return null;
}
- //TODO search in local state for a local recovery opportunity.
+ TaskInfo taskInfo = environment.getTaskInfo();
+
+ final KeyGroupRange keyGroupRange =
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
+ taskInfo.getMaxNumberOfParallelSubtasks(),
+ taskInfo.getNumberOfParallelSubtasks(),
+ taskInfo.getIndexOfThisSubtask());
- return createKeyedStatedBackendFromJobManagerState(
- keySerializer,
- operatorIdentifierText,
- operatorSubtaskStateFromJobManager,
- backendCloseableRegistry);
+ BackendRestorerProcedure<AbstractKeyedStateBackend<K>,
KeyedStateHandle> backendRestorer =
+ new BackendRestorerProcedure<>(
+ () -> stateBackend.createKeyedStateBackend(
+ environment,
+ environment.getJobID(),
+ operatorIdentifierText,
+ keySerializer,
+
taskInfo.getMaxNumberOfParallelSubtasks(),
+ keyGroupRange,
+ environment.getTaskKvStateRegistry()),
+ backendCloseableRegistry);
+
+ return
backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
}
protected CloseableIterable<StatePartitionStreamProvider>
rawOperatorStateInputs(
- OperatorSubtaskState operatorSubtaskStateFromJobManager) {
+ Iterator<StateObjectCollection<OperatorStateHandle>>
restoreStateAlternatives) {
- if (operatorSubtaskStateFromJobManager != null) {
+ if (restoreStateAlternatives.hasNext()) {
final CloseableRegistry closeableRegistry = new
CloseableRegistry();
- Collection<OperatorStateHandle> rawOperatorState =
-
operatorSubtaskStateFromJobManager.getRawOperatorState();
-
- return new
CloseableIterable<StatePartitionStreamProvider>() {
- @Override
- public void close() throws IOException {
- closeableRegistry.close();
- }
-
- @Nonnull
- @Override
- public Iterator<StatePartitionStreamProvider>
iterator() {
- return new OperatorStateStreamIterator(
-
DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
- rawOperatorState.iterator(),
closeableRegistry);
- }
- };
- }
-
- return CloseableIterable.empty();
- }
-
- protected CloseableIterable<KeyGroupStatePartitionStreamProvider>
rawKeyedStateInputs(
- OperatorSubtaskState operatorSubtaskStateFromJobManager) {
+ Collection<OperatorStateHandle> rawOperatorState =
restoreStateAlternatives.next();
+ // TODO currently this does not support local state
recovery, so we expect there is only one handle.
+
Preconditions.checkState(!restoreStateAlternatives.hasNext());
--- End diff --
Maybe we could add a proper failure message here.
> Implement task-local state recovery
> -----------------------------------
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main
> idea is to have a secondary, local copy of the checkpointed state, while
> there is still a primary copy in DFS that we report to the checkpoint
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available,
> to save network bandwidth. This requires that the assignment from tasks to
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and
> can easily enhance it to all other state types (e.g. operator state) later.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)