Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r162584293
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
---
@@ -214,49 +217,55 @@ public MetricGroup getMetricGroup() {
}
@Override
- public final void initializeState(OperatorSubtaskState stateHandles)
throws Exception {
+ public final void initializeState() throws Exception {
- Collection<KeyedStateHandle> keyedStateHandlesRaw = null;
- Collection<OperatorStateHandle> operatorStateHandlesRaw = null;
- Collection<OperatorStateHandle> operatorStateHandlesBackend =
null;
+ final TypeSerializer<?> keySerializer =
config.getStateKeySerializer(getUserCodeClassloader());
- boolean restoring = (null != stateHandles);
+ final StreamTask<?, ?> containingTask =
+ Preconditions.checkNotNull(getContainingTask());
+ final CloseableRegistry streamTaskCloseableRegistry =
+
Preconditions.checkNotNull(containingTask.getCancelables());
+ final StreamTaskStateManager streamTaskStateManager =
+
Preconditions.checkNotNull(containingTask.getStreamTaskStateManager());
- initKeyedState(); //TODO we should move the actual
initialization of this from StreamTask to this class
-
- if (getKeyedStateBackend() != null && timeServiceManager ==
null) {
- timeServiceManager = new InternalTimeServiceManager<>(
- getKeyedStateBackend().getNumberOfKeyGroups(),
- getKeyedStateBackend().getKeyGroupRange(),
+ final StreamOperatorStateContext context =
--- End diff --
As we discussed, this is bridging between internal and user-facing
interfaces. I would prefer to keep it as is, at least for now.
---