Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r162584551
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
 ---
    @@ -124,14 +140,69 @@ public void setUp() throws Exception {
                        operatorStateHandles.add(operatorStateHandle);
                }
     
    +           OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(
    +                   Collections.emptyList(),
    +                   operatorStateHandles,
    +                   Collections.emptyList(),
    +                   keyedStateHandles);
    +
    +           OperatorID operatorID = new OperatorID();
    +           TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
    +           taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, 
operatorSubtaskState);
    +
    +           JobManagerTaskRestore jobManagerTaskRestore = new 
JobManagerTaskRestore(0L, taskStateSnapshot);
    +
    +           TaskStateManager manager = new TaskStateManagerImpl(
    +                   new JobID(),
    +                   new ExecutionAttemptID(),
    +                   mock(TaskLocalStateStore.class),
    +                   jobManagerTaskRestore,
    +                   mock(CheckpointResponder.class));
    +
    +           DummyEnvironment environment = new DummyEnvironment(
    +                   "test",
    +                   1,
    +                   0,
    +                   prev);
    +
    +           environment.setTaskStateManager(manager);
    +
    +           StateBackend stateBackend = new MemoryStateBackend(1024);
    +           StreamTaskStateManager streamTaskStateManager = new 
StreamTaskStateManagerImpl(
    +                   environment,
    +                   stateBackend,
    +                   mock(ProcessingTimeService.class)) {
    +
    +                   @Override
    +                   protected <K> InternalTimeServiceManager<?, K> 
internalTimeServiceManager(
    +                           AbstractKeyedStateBackend<K> keyedStatedBackend,
    +                           KeyContext keyContext,
    +                           Iterable<KeyGroupStatePartitionStreamProvider> 
rawKeyedStates) throws Exception {
    +
    +                           // We do not initialize a timer service manager 
here, because it would already consume the raw keyed
    +                           // state as part of initialization. For the 
purpose of this test, we want an unconsumed raw keyed
    +                           // stream.
    +                           return null;
    +                   }
    +           };
    +
    +           AbstractStreamOperator<?> mockOperator = 
mock(AbstractStreamOperator.class);
    +           when(mockOperator.getOperatorID()).thenReturn(operatorID);
    +
    +           StreamOperatorStateContext stateContext = 
streamTaskStateManager.streamOperatorStateContext(
    +                   mockOperator,
    --- End diff --
    
    👍 


---

Reply via email to