gaoyunhaii commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r931909030


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -391,4 +476,36 @@ private static TaskMetricStore 
unmodifiable(TaskMetricStore source) {
                     unmodifiableMap(source.metrics), 
unmodifiableMap(source.subtasks));
         }
     }
+
+    /** Sub-structure containing metrics of a single subtask. */
+    @ThreadSafe
+    public static class SubtaskMetricStore extends ComponentMetricStore {
+        private final Map<Integer, ComponentMetricStore> attempts;
+
+        private SubtaskMetricStore() {
+            this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>());
+        }
+
+        private SubtaskMetricStore(
+                Map<String, String> metrics, Map<Integer, 
ComponentMetricStore> attempts) {
+            super(metrics);
+            this.attempts = checkNotNull(attempts);
+        }
+
+        private static SubtaskMetricStore unmodifiable(SubtaskMetricStore 
source) {

Review Comment:
   This method is not used?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -214,15 +260,33 @@ public void add(MetricDump metric) {
                     task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> 
new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    taskInfo.subtaskIndex, k -> new 
ComponentMetricStore());
+                                    taskInfo.subtaskIndex, k -> new 
SubtaskMetricStore());
+
+                    if (taskInfo.attemptNumber >= 0) {
+                        // Consider as the current attempt if current attempt 
id is not set, which
+                        // means there should be only one execution
+                        isCurrentAttempt =
+                                Optional.of(currentExecutionAttempts)
+                                                .map(m -> 
m.get(taskInfo.jobID))
+                                                .map(m -> 
m.get(taskInfo.vertexID))
+                                                .map(m -> 
m.get(taskInfo.subtaskIndex))
+                                                .orElse(taskInfo.attemptNumber)
+                                        == taskInfo.attemptNumber;
+                        attempt =
+                                subtask.attempts.computeIfAbsent(
+                                        taskInfo.attemptNumber, k -> new 
ComponentMetricStore());
+                        addMetric(attempt.metrics, name, metric);
+                    }
                     /**
                      * The duplication is intended. Metrics scoped by subtask 
are useful for several
                      * job/task handlers, while the WebInterface task metric 
queries currently do
                      * not account for subtasks, so we don't divide by subtask 
and instead use the
                      * concatenation of subtask index and metric name as the 
name for those.
                      */
-                    addMetric(subtask.metrics, name, metric);
-                    addMetric(task.metrics, taskInfo.subtaskIndex + "." + 
name, metric);
+                    if (isCurrentAttempt) {

Review Comment:
   Might move the above comments inside `if` ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo 
createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> 
currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new 
ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : 
subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = 
getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);

Review Comment:
   We could make  subtaskMetricStores to be `Map<Integer, SubtaskMetricStore>` 
and use `entry.getValue()` directly.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -233,21 +297,42 @@ public void add(MetricDump metric) {
                                     operatorInfo.vertexID, k -> new 
TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    operatorInfo.subtaskIndex, k -> new 
ComponentMetricStore());
+                                    operatorInfo.subtaskIndex, k -> new 
SubtaskMetricStore());
+
+                    if (operatorInfo.attemptNumber >= 0) {

Review Comment:
   Could we extract a help method for the subtask and operator scopes ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java:
##########
@@ -127,22 +128,24 @@ private static JobExceptionsInfoWithHistory 
createJobExceptionsInfo(
         List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new 
ArrayList<>();
         boolean truncated = false;
         for (AccessExecutionVertex task : 
executionGraph.getAllExecutionVertices()) {
-            Optional<ErrorInfo> failure = task.getFailureInfo();
-            if (failure.isPresent()) {
-                if (taskExceptionList.size() >= exceptionToReportMaxSize) {
-                    truncated = true;
-                    break;
+            for (AccessExecution execution : task.getCurrentExecutions()) {
+                Optional<ErrorInfo> failure = execution.getFailureInfo();
+                if (failure.isPresent()) {
+                    if (taskExceptionList.size() >= exceptionToReportMaxSize) {
+                        truncated = true;
+                        break;
+                    }
+
+                    TaskManagerLocation location = 
execution.getAssignedResourceLocation();
+                    String locationString = toString(location);
+                    long timestamp = 
execution.getStateTimestamp(ExecutionState.FAILED);
+                    taskExceptionList.add(
+                            new JobExceptionsInfo.ExecutionExceptionInfo(
+                                    failure.get().getExceptionAsString(),
+                                    task.getTaskNameWithSubtaskIndex(),

Review Comment:
   It seems currently the page would not show the attempts information? Should 
we modify here to distinguish the errors of different attempts ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java:
##########
@@ -120,9 +120,24 @@ private static JobVertexDetailsInfo 
createJobVertexDetailsInfo(
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
             final AccessExecution execution = 
vertex.getCurrentExecutionAttempt();
             final JobVertexID jobVertexID = jobVertex.getJobVertexId();
+
+            final Collection<AccessExecution> attempts = 
vertex.getCurrentExecutions();
+            List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts = 
null;
+
+            if (attempts.size() > 1) {
+                otherConcurrentAttempts = new ArrayList<>();
+                for (AccessExecution attempt : attempts) {
+                    if (attempt.getAttemptNumber() != 
execution.getAttemptNumber()) {
+                        otherConcurrentAttempts.add(
+                                SubtaskExecutionAttemptDetailsInfo.create(
+                                        attempt, metricFetcher, jobID, 
jobVertexID, null));
+                    }
+                }
+            }
+
             subtasks.add(
                     SubtaskExecutionAttemptDetailsInfo.create(
-                            execution, metricFetcher, jobID, jobVertexID));
+                            execution, metricFetcher, jobID, jobVertexID, 
otherConcurrentAttempts));

Review Comment:
   Might rename `execution` to `currentAttempt` 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -214,15 +260,33 @@ public void add(MetricDump metric) {
                     task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> 
new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    taskInfo.subtaskIndex, k -> new 
ComponentMetricStore());
+                                    taskInfo.subtaskIndex, k -> new 
SubtaskMetricStore());
+
+                    if (taskInfo.attemptNumber >= 0) {
+                        // Consider as the current attempt if current attempt 
id is not set, which
+                        // means there should be only one execution
+                        isCurrentAttempt =
+                                Optional.of(currentExecutionAttempts)
+                                                .map(m -> 
m.get(taskInfo.jobID))
+                                                .map(m -> 
m.get(taskInfo.vertexID))
+                                                .map(m -> 
m.get(taskInfo.subtaskIndex))
+                                                .orElse(taskInfo.attemptNumber)
+                                        == taskInfo.attemptNumber;
+                        attempt =
+                                subtask.attempts.computeIfAbsent(

Review Comment:
   It seems a bit weird to break the encapsulation here to direct modify the 
attempts. But we might consider it separately. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to