Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r168530422 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java --- @@ -118,12 +121,38 @@ public void reportTaskStateSnapshots( } } + @Nonnull @Override - public OperatorSubtaskState operatorStates(OperatorID operatorID) { - TaskStateSnapshot taskStateSnapshot = getLastJobManagerTaskStateSnapshot(); - return taskStateSnapshot != null ? taskStateSnapshot.getSubtaskStateByOperatorID(operatorID) : null; + public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID) { + TaskStateSnapshot jmTaskStateSnapshot = getLastJobManagerTaskStateSnapshot(); + TaskStateSnapshot tmTaskStateSnapshot = getLastTaskManagerTaskStateSnapshot(); + + OperatorSubtaskState jmOpState = null; + List<OperatorSubtaskState> tmStateCollection = null; + + if (jmTaskStateSnapshot != null) { + jmOpState = jmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID); + } + + if (tmTaskStateSnapshot != null) { + OperatorSubtaskState tmOpState = tmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID); + if (tmOpState != null) { + tmStateCollection = Collections.singletonList(tmOpState); + } + } + + if (jmOpState == null) { + jmOpState = new OperatorSubtaskState(); + } + + if (tmStateCollection == null) { + tmStateCollection = Collections.emptyList(); + } --- End diff -- ð
---