pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932469098


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -182,30 +188,32 @@ private static JobVertexTaskManagersInfo 
createJobVertexTaskManagersInfo(
                 allFinished &= state.isTerminal();
                 endTime = Math.max(endTime, vertex.getStateTimestamp(state));
 
-                counts.addIOMetrics(
-                        vertex.getCurrentExecutionAttempt(),
-                        metricFetcher,
-                        jobID.toString(),
-                        jobVertex.getJobVertexId().toString());
-                MutableIOMetrics current = new MutableIOMetrics();
-                current.addIOMetrics(
-                        vertex.getCurrentExecutionAttempt(),
-                        metricFetcher,
-                        jobID.toString(),
-                        jobVertex.getJobVertexId().toString());
-                ioMetricsInfos.add(
-                        new IOMetricsInfo(
-                                current.getNumBytesIn(),
-                                current.isNumBytesInComplete(),
-                                current.getNumBytesOut(),
-                                current.isNumBytesOutComplete(),
-                                current.getNumRecordsIn(),
-                                current.isNumRecordsInComplete(),
-                                current.getNumRecordsOut(),
-                                current.isNumRecordsOutComplete(),
-                                current.getAccumulateBackPressuredTime(),
-                                current.getAccumulateIdleTime(),
-                                current.getAccumulateBusyTime()));
+                for (AccessExecution attempt : vertex.getCurrentExecutions()) {

Review Comment:
   You are right. While when I was trying to do a further refactor of this 
handler, I found that maybe we should use only the representing execution to 
calculate the aggregated task manager status, or the state can be confusing 
since if one attempt is CANCELED (even it's a speculative one), the state of 
the task manager is CANCELED. I don't think this is the expected behavior. I'd 
revert the changes and add a comment to describe why we choose to use the 
representing attempt only.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java:
##########
@@ -99,25 +99,27 @@ public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph)
         List<ArchivedJson> archive = new ArrayList<>(16);
         for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
             for (AccessExecutionVertex subtask : task.getTaskVertices()) {
-                ResponseBody curAttemptJson =
-                        
createAccumulatorInfo(subtask.getCurrentExecutionAttempt());
-                String curAttemptPath =
-                        getMessageHeaders()
-                                .getTargetRestEndpointURL()
-                                .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString())
-                                .replace(
-                                        ':' + JobVertexIdPathParameter.KEY,
-                                        task.getJobVertexId().toString())
-                                .replace(
-                                        ':' + SubtaskIndexPathParameter.KEY,
-                                        
String.valueOf(subtask.getParallelSubtaskIndex()))
-                                .replace(
-                                        ':' + SubtaskAttemptPathParameter.KEY,
-                                        String.valueOf(
-                                                
subtask.getCurrentExecutionAttempt()
-                                                        .getAttemptNumber()));
-
-                archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+                for (AccessExecution attempt : subtask.getCurrentExecutions()) 
{
+                    if (attempt != null) {

Review Comment:
   It refers to the HistoricalExecutions part, but indeed it's redundant since 
the attempt should never be null. I'll remove the check.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java:
##########
@@ -257,18 +258,19 @@ private Map<TaskManagerLocation, 
ImmutableSet<ExecutionAttemptID>> groupExecutio
                         executionVertex.getExecutionState());
                 continue;
             }
-            TaskManagerLocation tmLocation = 
executionVertex.getCurrentAssignedResourceLocation();
-            if (tmLocation == null) {
-                LOG.trace("ExecutionVertex {} is currently not assigned", 
executionVertex);
-                continue;
-            }
-            Set<ExecutionAttemptID> groupedAttemptIds =
-                    executionAttemptsByLocation.getOrDefault(tmLocation, new 
HashSet<>());
+            for (AccessExecution execution : 
executionVertex.getCurrentExecutions()) {

Review Comment:
   The ExecutionAttemptID stands for a specific attempt of a specific 
execution. It seems the further processing doesn't care if there are 
ExecutionAttemptIDs belonging to the same execution. It processes each 
ExecutionAttemptID independently.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo 
createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> 
currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new 
ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : 
subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = 
getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || 
allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null
+                                ? -1
+                                : 
currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+                if (!allAttemptsMetricStores.containsKey(currentAttempt)) {
+                    // allAttemptsMetricStores is not empty here
+                    currentAttempt = 
allAttemptsMetricStores.keySet().iterator().next();
+                }
+                List<SubtaskBackPressureInfo> otherConcurrentAttempts =
+                        new ArrayList<>(allAttemptsMetricStores.size() - 1);
+                for (Map.Entry<Integer, ComponentMetricStore> attemptStore :
+                        allAttemptsMetricStores.entrySet()) {
+                    if (attemptStore.getKey() == currentAttempt) {
+                        continue;
+                    }
+                    otherConcurrentAttempts.add(
+                            createSubtaskAttemptBackpressureInfo(
+                                    subtaskIndex,
+                                    attemptStore.getKey(),
+                                    attemptStore.getValue(),
+                                    null));
+                }
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex,
+                                currentAttempt,
+                                allAttemptsMetricStores.get(currentAttempt),
+                                otherConcurrentAttempts));
+            }
         }
         
result.sort(Comparator.comparingInt(SubtaskBackPressureInfo::getSubtask));

Review Comment:
   Yes it is since there's only one element in the result for each subtask. 
Other attempts are in the otherConcurrentAttempts.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo 
createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> 
currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new 
ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : 
subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = 
getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || 
allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null

Review Comment:
   currentExecutionAttempts is acquired from the JobDetails. In fact the 
currentExecutionAttempts of a subtask only exists when there's more than one 
current attempt. So it's normal that currentExecutionAttempts is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to