pltbkd commented on code in PR #20733:
URL: https://github.com/apache/flink/pull/20733#discussion_r963328451
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -82,18 +83,52 @@ synchronized void retainTaskManagers(List<String>
activeTaskManagers) {
*/
synchronized void retainJobs(List<String> activeJobs) {
jobs.keySet().retainAll(activeJobs);
- currentExecutionAttempts.keySet().retainAll(activeJobs);
+ representativeAttempts.keySet().retainAll(activeJobs);
}
public synchronized void
updateCurrentExecutionAttempts(Collection<JobDetails> jobs) {
- jobs.forEach(
- job ->
- currentExecutionAttempts.put(
- job.getJobId().toString(),
job.getCurrentExecutionAttempts()));
+ for (JobDetails job : jobs) {
+ String jobId = job.getJobId().toString();
+ Map<String, Map<Integer, List<Integer>>> currentExecutionAttempts =
+ job.getCurrentExecutionAttempts();
+ Map<String, Map<Integer, Integer>> jobRepresentativeAttempts =
+ new HashMap<>(currentExecutionAttempts.size());
+ for (Map.Entry<String, Map<Integer, List<Integer>>> vertexEntry :
+ currentExecutionAttempts.entrySet()) {
+ String vertexId = vertexEntry.getKey();
+ Map<Integer, List<Integer>> subtaskAttempts =
vertexEntry.getValue();
+ Map<Integer, Integer> vertexRepresentativeAttempts =
+ new HashMap<>(subtaskAttempts.size());
+ for (Map.Entry<Integer, List<Integer>> e :
subtaskAttempts.entrySet()) {
+ int subtaskIndex = e.getKey();
+ List<Integer> currentAttemptNumbers = e.getValue();
+ retainSubtaskAttemptMetrics(
Review Comment:
I meant to avoid iterating the map twice. The code has been updated, do you
think it's clear enough?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]