[
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16322295#comment-16322295
]
ASF GitHub Bot commented on FLINK-8360:
---------------------------------------
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r160956609
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
---
@@ -124,14 +140,69 @@ public void setUp() throws Exception {
operatorStateHandles.add(operatorStateHandle);
}
+ OperatorSubtaskState operatorSubtaskState = new
OperatorSubtaskState(
+ Collections.emptyList(),
+ operatorStateHandles,
+ Collections.emptyList(),
+ keyedStateHandles);
+
+ OperatorID operatorID = new OperatorID();
+ TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
+ taskStateSnapshot.putSubtaskStateByOperatorID(operatorID,
operatorSubtaskState);
+
+ JobManagerTaskRestore jobManagerTaskRestore = new
JobManagerTaskRestore(0L, taskStateSnapshot);
+
+ TaskStateManager manager = new TaskStateManagerImpl(
+ new JobID(),
+ new ExecutionAttemptID(),
+ mock(TaskLocalStateStore.class),
+ jobManagerTaskRestore,
+ mock(CheckpointResponder.class));
+
+ DummyEnvironment environment = new DummyEnvironment(
+ "test",
+ 1,
+ 0,
+ prev);
+
+ environment.setTaskStateManager(manager);
+
+ StateBackend stateBackend = new MemoryStateBackend(1024);
+ StreamTaskStateManager streamTaskStateManager = new
StreamTaskStateManagerImpl(
+ environment,
+ stateBackend,
+ mock(ProcessingTimeService.class)) {
+
+ @Override
+ protected <K> InternalTimeServiceManager<?, K>
internalTimeServiceManager(
+ AbstractKeyedStateBackend<K> keyedStatedBackend,
+ KeyContext keyContext,
+ Iterable<KeyGroupStatePartitionStreamProvider>
rawKeyedStates) throws Exception {
+
+ // We do not initialize a timer service manager
here, because it would already consume the raw keyed
+ // state as part of initialization. For the
purpose of this test, we want an unconsumed raw keyed
+ // stream.
+ return null;
+ }
+ };
+
+ AbstractStreamOperator<?> mockOperator =
mock(AbstractStreamOperator.class);
+ when(mockOperator.getOperatorID()).thenReturn(operatorID);
+
+ StreamOperatorStateContext stateContext =
streamTaskStateManager.streamOperatorStateContext(
+ mockOperator,
--- End diff --
Does the `StreamOperatorStateContext` needs to have
`AbstractStreamOperator` as a parameter? Couldn't we cut this dependency?
This mockito usage suggests that this `streamOperatorStateContext` method
is receiving more in the parameter compared to what it needs.
Rephrasing this concern, is it legal and does it make sense for the
`streamOperatorStateContext(operator, ...)` method to invoke one of the ~30
`operator` methods? Like `operator.dispose()`, `operator.snapshoState()`,
`operator.getCurrentKey()`, ... ?
> Implement task-local state recovery
> -----------------------------------
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main
> idea is to have a secondary, local copy of the checkpointed state, while
> there is still a primary copy in DFS that we report to the checkpoint
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available,
> to save network bandwidth. This requires that the assignment from tasks to
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and
> can easily enhance it to all other state types (e.g. operator state) later.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)