Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168507106
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
---
@@ -118,12 +121,38 @@ public void reportTaskStateSnapshots(
}
}
+ @Nonnull
@Override
- public OperatorSubtaskState operatorStates(OperatorID operatorID) {
- TaskStateSnapshot taskStateSnapshot =
getLastJobManagerTaskStateSnapshot();
- return taskStateSnapshot != null ?
taskStateSnapshot.getSubtaskStateByOperatorID(operatorID) : null;
+ public PrioritizedOperatorSubtaskState
prioritizedOperatorState(OperatorID operatorID) {
+ TaskStateSnapshot jmTaskStateSnapshot =
getLastJobManagerTaskStateSnapshot();
+ TaskStateSnapshot tmTaskStateSnapshot =
getLastTaskManagerTaskStateSnapshot();
+
+ OperatorSubtaskState jmOpState = null;
+ List<OperatorSubtaskState> tmStateCollection = null;
+
+ if (jmTaskStateSnapshot != null) {
+ jmOpState =
jmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+ }
+
+ if (tmTaskStateSnapshot != null) {
+ OperatorSubtaskState tmOpState =
tmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+ if (tmOpState != null) {
+ tmStateCollection =
Collections.singletonList(tmOpState);
+ }
+ }
+
+ if (jmOpState == null) {
+ jmOpState = new OperatorSubtaskState();
+ }
--- End diff --
I think it would be clearer to executed this branch as the else branch of
the `if (jmTaskStateSnapshot != null)`.
---