pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932851211


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo 
createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> 
currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new 
ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : 
subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = 
getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || 
allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null
+                                ? -1
+                                : 
currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+                if (!allAttemptsMetricStores.containsKey(currentAttempt)) {

Review Comment:
   I don't think so. The issue only exists while fetching the metrics of a 
subtask or a task, that the acquired metrics may be not from the latest 
representative attempt. The archived execution graph contains which attempt is 
the representative one, and the metrics are acquired with attempt number. So 
neither the state nor the metrics for each attempt will be wrong. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to