This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f3ab9626bf1 [FLINK-32199][rest] Remove redundant metrics in 
TaskMetricStore after rescale down.
f3ab9626bf1 is described below

commit f3ab9626bf18cad993f7cecba23a7bce6e14407b
Author: xincheng.ljr <[email protected]>
AuthorDate: Tue May 30 20:21:29 2023 +0800

    [FLINK-32199][rest] Remove redundant metrics in TaskMetricStore after 
rescale down.
    
    This closes #22681
---
 .../rest/handler/legacy/metrics/MetricStore.java   | 13 +++++++++
 .../handler/legacy/metrics/MetricStoreTest.java    | 34 +++++++++++++++++-----
 2 files changed, 39 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 7f86ff32366..fde960dd09b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -521,6 +521,19 @@ public class MetricStore {
         }
 
         void retainSubtasks(Set<Integer> activeSubtasks) {
+            // Retain metrics of pattern subtaskIndex.metricName which are 
directly stored in
+            // TaskMetricStore
+            metrics.keySet()
+                    .removeIf(
+                            key -> {
+                                // To prevent errors in metric parsing, here 
we only
+                                // clean up metrics with a pattern of
+                                // "subtaskIndex.metricName"
+                                String index = key.substring(0, 
Math.max(key.indexOf('.'), 0));
+                                return index.matches("\\d+")
+                                        && 
!activeSubtasks.contains(Integer.parseInt(index));
+                            });
+
             subtasks.keySet().retainAll(activeSubtasks);
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index 8c3b7cb3265..0c7d0b650ba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -27,12 +27,16 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 
 import org.junit.jupiter.api.Test;
 
+import javax.annotation.Nonnull;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -135,10 +139,11 @@ class MetricStoreTest {
     @Test
     void testTaskMetricStoreCleanup() {
         MetricStore store = setupStore(new MetricStore());
-        assertThat(
-                        store.getTaskMetricStore(JOB_ID.toString(), "taskid")
-                                .getAllSubtaskMetricStores()
-                                .keySet())
+        MetricStore.TaskMetricStore taskMetricStore =
+                store.getTaskMetricStore(JOB_ID.toString(), "taskid");
+        assertThat(taskMetricStore.getAllSubtaskMetricStores().keySet())
+                .containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 8));
+        assertThat(getTaskMetricStoreIndexes(taskMetricStore))
                 .containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 8));
 
         Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
@@ -159,13 +164,26 @@ class MetricStoreTest {
                         currentExecutionAttempts);
         store.updateCurrentExecutionAttempts(Collections.singleton(jobDetail));
 
-        assertThat(
-                        store.getTaskMetricStore(JOB_ID.toString(), "taskid")
-                                .getAllSubtaskMetricStores()
-                                .keySet())
+        assertThat(taskMetricStore.getAllSubtaskMetricStores().keySet())
+                
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(1));
+
+        assertThat(getTaskMetricStoreIndexes(taskMetricStore))
                 
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(1));
     }
 
+    @Nonnull
+    private static Set<Integer> getTaskMetricStoreIndexes(
+            MetricStore.TaskMetricStore taskMetricStore) {
+        return taskMetricStore.metrics.keySet().stream()
+                .map(
+                        key -> {
+                            String index = key.substring(0, 
Math.max(key.indexOf('.'), 0));
+                            return index.matches("\\d+") ? 
Integer.parseInt(index) : null;
+                        })
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+    }
+
     @Test
     void testSubtaskMetricStoreCleanup() {
         MetricStore store = setupStore(new MetricStore());

Reply via email to