Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168530422
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
 ---
    @@ -118,12 +121,38 @@ public void reportTaskStateSnapshots(
                }
        }
     
    +   @Nonnull
        @Override
    -   public OperatorSubtaskState operatorStates(OperatorID operatorID) {
    -           TaskStateSnapshot taskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
    -           return taskStateSnapshot != null ? 
taskStateSnapshot.getSubtaskStateByOperatorID(operatorID) : null;
    +   public PrioritizedOperatorSubtaskState 
prioritizedOperatorState(OperatorID operatorID) {
    +           TaskStateSnapshot jmTaskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
    +           TaskStateSnapshot tmTaskStateSnapshot = 
getLastTaskManagerTaskStateSnapshot();
    +
    +           OperatorSubtaskState jmOpState = null;
    +           List<OperatorSubtaskState> tmStateCollection = null;
    +
    +           if (jmTaskStateSnapshot != null) {
    +                   jmOpState = 
jmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
    +           }
    +
    +           if (tmTaskStateSnapshot != null) {
    +                   OperatorSubtaskState tmOpState = 
tmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
    +                   if (tmOpState != null) {
    +                           tmStateCollection = 
Collections.singletonList(tmOpState);
    +                   }
    +           }
    +
    +           if (jmOpState == null) {
    +                   jmOpState = new OperatorSubtaskState();
    +           }
    +
    +           if (tmStateCollection == null) {
    +                   tmStateCollection = Collections.emptyList();
    +           }
    --- End diff --
    
    👍 


---

Reply via email to