Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r160956609
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
---
@@ -124,14 +140,69 @@ public void setUp() throws Exception {
operatorStateHandles.add(operatorStateHandle);
}
+ OperatorSubtaskState operatorSubtaskState = new
OperatorSubtaskState(
+ Collections.emptyList(),
+ operatorStateHandles,
+ Collections.emptyList(),
+ keyedStateHandles);
+
+ OperatorID operatorID = new OperatorID();
+ TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
+ taskStateSnapshot.putSubtaskStateByOperatorID(operatorID,
operatorSubtaskState);
+
+ JobManagerTaskRestore jobManagerTaskRestore = new
JobManagerTaskRestore(0L, taskStateSnapshot);
+
+ TaskStateManager manager = new TaskStateManagerImpl(
+ new JobID(),
+ new ExecutionAttemptID(),
+ mock(TaskLocalStateStore.class),
+ jobManagerTaskRestore,
+ mock(CheckpointResponder.class));
+
+ DummyEnvironment environment = new DummyEnvironment(
+ "test",
+ 1,
+ 0,
+ prev);
+
+ environment.setTaskStateManager(manager);
+
+ StateBackend stateBackend = new MemoryStateBackend(1024);
+ StreamTaskStateManager streamTaskStateManager = new
StreamTaskStateManagerImpl(
+ environment,
+ stateBackend,
+ mock(ProcessingTimeService.class)) {
+
+ @Override
+ protected <K> InternalTimeServiceManager<?, K>
internalTimeServiceManager(
+ AbstractKeyedStateBackend<K> keyedStatedBackend,
+ KeyContext keyContext,
+ Iterable<KeyGroupStatePartitionStreamProvider>
rawKeyedStates) throws Exception {
+
+ // We do not initialize a timer service manager
here, because it would already consume the raw keyed
+ // state as part of initialization. For the
purpose of this test, we want an unconsumed raw keyed
+ // stream.
+ return null;
+ }
+ };
+
+ AbstractStreamOperator<?> mockOperator =
mock(AbstractStreamOperator.class);
+ when(mockOperator.getOperatorID()).thenReturn(operatorID);
+
+ StreamOperatorStateContext stateContext =
streamTaskStateManager.streamOperatorStateContext(
+ mockOperator,
--- End diff --
Does the `StreamOperatorStateContext` needs to have
`AbstractStreamOperator` as a parameter? Couldn't we cut this dependency?
This mockito usage suggests that this `streamOperatorStateContext` method
is receiving more in the parameter compared to what it needs.
Rephrasing this concern, is it legal and does it make sense for the
`streamOperatorStateContext(operator, ...)` method to invoke one of the ~30
`operator` methods? Like `operator.dispose()`, `operator.snapshoState()`,
`operator.getCurrentKey()`, ... ?
---