pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932468662
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -177,7 +221,9 @@ public void add(MetricDump metric) {
TaskManagerMetricStore tm;
JobMetricStore job;
TaskMetricStore task;
- ComponentMetricStore subtask;
+ SubtaskMetricStore subtask;
+ ComponentMetricStore attempt;
+ boolean isCurrentAttempt = true;
Review Comment:
isRepresentativeAttempt here is a good name to explain the meaning. But I
would limit the "representative" naming here. CurrentExecutionAttempts in the
JobDetails is collected via Execution#getCurrentExecutionAttempt, and passed
to other places. Maybe we can modify the naming when we formally introduce the
concept of the representative attempt for an Execution.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -54,6 +57,14 @@ public class MetricStore {
private final Map<String, TaskManagerMetricStore> taskManagers = new
ConcurrentHashMap<>();
private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
+ /**
+ * The map holds the attempt number of the representing execution for each
subtask of each
+ * vertex. When a metric of an execution attempt is added, the metric can
also be added to the
+ * SubtaskMetricStore when it is of the representing execution.
+ */
+ private final Map<String, Map<String, Map<Integer, Integer>>>
currentExecutionAttempts =
Review Comment:
I'd prefer to remain the naming. See the reply above.
Or maybe we may only modify the map here in the MetricStore, but do not
modify that in other classes?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -131,18 +132,23 @@ private static JobVertexTaskManagersInfo
createJobVertexTaskManagersInfo(
Map<String, String> taskManagerId2Host = new HashMap<>();
Map<String, List<AccessExecutionVertex>> taskManagerVertices = new
HashMap<>();
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
- TaskManagerLocation location =
vertex.getCurrentAssignedResourceLocation();
- String taskManagerHost =
- location == null
- ? "(unassigned)"
- : location.getHostname() + ':' +
location.dataPort();
- String taskmanagerId =
- location == null ? "(unassigned)" :
location.getResourceID().toString();
- taskManagerId2Host.put(taskmanagerId, taskManagerHost);
- List<AccessExecutionVertex> vertices =
- taskManagerVertices.computeIfAbsent(
- taskmanagerId, ignored -> new ArrayList<>(4));
- vertices.add(vertex);
+ List<TaskManagerLocation> locations =
+ vertex.getCurrentExecutions().stream()
+ .map(AccessExecution::getAssignedResourceLocation)
+ .collect(Collectors.toList());
+ for (TaskManagerLocation location : locations) {
+ String taskManagerHost =
+ location == null
+ ? "(unassigned)"
+ : location.getHostname() + ':' +
location.dataPort();
+ String taskmanagerId =
+ location == null ? "(unassigned)" :
location.getResourceID().toString();
+ taskManagerId2Host.put(taskmanagerId, taskManagerHost);
+ List<AccessExecutionVertex> vertices =
+ taskManagerVertices.computeIfAbsent(
+ taskmanagerId, ignored -> new ArrayList<>(4));
Review Comment:
Though I agree, I'd prefer not to change the existing code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]