Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r160725404
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
    @@ -214,49 +217,55 @@ public MetricGroup getMetricGroup() {
        }
     
        @Override
    -   public final void initializeState(OperatorSubtaskState stateHandles) 
throws Exception {
    +   public final void initializeState() throws Exception {
     
    -           Collection<KeyedStateHandle> keyedStateHandlesRaw = null;
    -           Collection<OperatorStateHandle> operatorStateHandlesRaw = null;
    -           Collection<OperatorStateHandle> operatorStateHandlesBackend = 
null;
    +           final TypeSerializer<?> keySerializer = 
config.getStateKeySerializer(getUserCodeClassloader());
     
    -           boolean restoring = (null != stateHandles);
    +           final StreamTask<?, ?> containingTask =
    +                   Preconditions.checkNotNull(getContainingTask());
    +           final CloseableRegistry streamTaskCloseableRegistry =
    +                   
Preconditions.checkNotNull(containingTask.getCancelables());
    +           final StreamTaskStateManager streamTaskStateManager =
    +                   
Preconditions.checkNotNull(containingTask.getStreamTaskStateManager());
     
    -           initKeyedState(); //TODO we should move the actual 
initialization of this from StreamTask to this class
    -
    -           if (getKeyedStateBackend() != null && timeServiceManager == 
null) {
    -                   timeServiceManager = new InternalTimeServiceManager<>(
    -                           getKeyedStateBackend().getNumberOfKeyGroups(),
    -                           getKeyedStateBackend().getKeyGroupRange(),
    +           final StreamOperatorStateContext context =
    --- End diff --
    
    Do we need this one additional level of indirection in form of 
`StreamOperatorStateContext`? Couldn't `streamTaskStateManager` return the 
`StateInitializationContext` directly? Or do you think otherwise? (I'm not sure 
on this comment)


---

Reply via email to