pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932468662


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -177,7 +221,9 @@ public void add(MetricDump metric) {
             TaskManagerMetricStore tm;
             JobMetricStore job;
             TaskMetricStore task;
-            ComponentMetricStore subtask;
+            SubtaskMetricStore subtask;
+            ComponentMetricStore attempt;
+            boolean isCurrentAttempt = true;

Review Comment:
   isRepresentativeAttempt here is a good name to explain the meaning. But I 
would limit the "representative" naming here. CurrentExecutionAttempts in the 
JobDetails is collected via Execution#getCurrentExecutionAttempt, and  passed 
to other places. Maybe we can modify the naming when we formally introduce the 
concept of the representative attempt for an Execution.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -54,6 +57,14 @@ public class MetricStore {
     private final Map<String, TaskManagerMetricStore> taskManagers = new 
ConcurrentHashMap<>();
     private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
 
+    /**
+     * The map holds the attempt number of the representing execution for each 
subtask of each
+     * vertex. When a metric of an execution attempt is added, the metric can 
also be added to the
+     * SubtaskMetricStore when it is of the representing execution.
+     */
+    private final Map<String, Map<String, Map<Integer, Integer>>> 
currentExecutionAttempts =

Review Comment:
   I'd prefer to remain the naming. See the reply above.
   Or maybe we may only modify the map here in the MetricStore, but do not 
modify that in other classes?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -131,18 +132,23 @@ private static JobVertexTaskManagersInfo 
createJobVertexTaskManagersInfo(
         Map<String, String> taskManagerId2Host = new HashMap<>();
         Map<String, List<AccessExecutionVertex>> taskManagerVertices = new 
HashMap<>();
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
-            TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
-            String taskManagerHost =
-                    location == null
-                            ? "(unassigned)"
-                            : location.getHostname() + ':' + 
location.dataPort();
-            String taskmanagerId =
-                    location == null ? "(unassigned)" : 
location.getResourceID().toString();
-            taskManagerId2Host.put(taskmanagerId, taskManagerHost);
-            List<AccessExecutionVertex> vertices =
-                    taskManagerVertices.computeIfAbsent(
-                            taskmanagerId, ignored -> new ArrayList<>(4));
-            vertices.add(vertex);
+            List<TaskManagerLocation> locations =
+                    vertex.getCurrentExecutions().stream()
+                            .map(AccessExecution::getAssignedResourceLocation)
+                            .collect(Collectors.toList());
+            for (TaskManagerLocation location : locations) {
+                String taskManagerHost =
+                        location == null
+                                ? "(unassigned)"
+                                : location.getHostname() + ':' + 
location.dataPort();
+                String taskmanagerId =
+                        location == null ? "(unassigned)" : 
location.getResourceID().toString();
+                taskManagerId2Host.put(taskmanagerId, taskManagerHost);
+                List<AccessExecutionVertex> vertices =
+                        taskManagerVertices.computeIfAbsent(
+                                taskmanagerId, ignored -> new ArrayList<>(4));

Review Comment:
   Though I agree, I'd prefer not to change the existing code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to