Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r160725404
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
---
@@ -214,49 +217,55 @@ public MetricGroup getMetricGroup() {
}
@Override
- public final void initializeState(OperatorSubtaskState stateHandles)
throws Exception {
+ public final void initializeState() throws Exception {
- Collection<KeyedStateHandle> keyedStateHandlesRaw = null;
- Collection<OperatorStateHandle> operatorStateHandlesRaw = null;
- Collection<OperatorStateHandle> operatorStateHandlesBackend =
null;
+ final TypeSerializer<?> keySerializer =
config.getStateKeySerializer(getUserCodeClassloader());
- boolean restoring = (null != stateHandles);
+ final StreamTask<?, ?> containingTask =
+ Preconditions.checkNotNull(getContainingTask());
+ final CloseableRegistry streamTaskCloseableRegistry =
+
Preconditions.checkNotNull(containingTask.getCancelables());
+ final StreamTaskStateManager streamTaskStateManager =
+
Preconditions.checkNotNull(containingTask.getStreamTaskStateManager());
- initKeyedState(); //TODO we should move the actual
initialization of this from StreamTask to this class
-
- if (getKeyedStateBackend() != null && timeServiceManager ==
null) {
- timeServiceManager = new InternalTimeServiceManager<>(
- getKeyedStateBackend().getNumberOfKeyGroups(),
- getKeyedStateBackend().getKeyGroupRange(),
+ final StreamOperatorStateContext context =
--- End diff --
Do we need this one additional level of indirection in form of
`StreamOperatorStateContext`? Couldn't `streamTaskStateManager` return the
`StateInitializationContext` directly? Or do you think otherwise? (I'm not sure
on this comment)
---