This is an automated email from the ASF dual-hosted git repository.
wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f3ab9626bf1 [FLINK-32199][rest] Remove redundant metrics in
TaskMetricStore after rescale down.
f3ab9626bf1 is described below
commit f3ab9626bf18cad993f7cecba23a7bce6e14407b
Author: xincheng.ljr <[email protected]>
AuthorDate: Tue May 30 20:21:29 2023 +0800
[FLINK-32199][rest] Remove redundant metrics in TaskMetricStore after
rescale down.
This closes #22681
---
.../rest/handler/legacy/metrics/MetricStore.java | 13 +++++++++
.../handler/legacy/metrics/MetricStoreTest.java | 34 +++++++++++++++++-----
2 files changed, 39 insertions(+), 8 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 7f86ff32366..fde960dd09b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -521,6 +521,19 @@ public class MetricStore {
}
void retainSubtasks(Set<Integer> activeSubtasks) {
+ // Retain metrics of pattern subtaskIndex.metricName which are
directly stored in
+ // TaskMetricStore
+ metrics.keySet()
+ .removeIf(
+ key -> {
+ // To prevent errors in metric parsing, here
we only
+ // clean up metrics with a pattern of
+ // "subtaskIndex.metricName"
+ String index = key.substring(0,
Math.max(key.indexOf('.'), 0));
+ return index.matches("\\d+")
+ &&
!activeSubtasks.contains(Integer.parseInt(index));
+ });
+
subtasks.keySet().retainAll(activeSubtasks);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index 8c3b7cb3265..0c7d0b650ba 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -27,12 +27,16 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.junit.jupiter.api.Test;
+import javax.annotation.Nonnull;
+
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -135,10 +139,11 @@ class MetricStoreTest {
@Test
void testTaskMetricStoreCleanup() {
MetricStore store = setupStore(new MetricStore());
- assertThat(
- store.getTaskMetricStore(JOB_ID.toString(), "taskid")
- .getAllSubtaskMetricStores()
- .keySet())
+ MetricStore.TaskMetricStore taskMetricStore =
+ store.getTaskMetricStore(JOB_ID.toString(), "taskid");
+ assertThat(taskMetricStore.getAllSubtaskMetricStores().keySet())
+ .containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 8));
+ assertThat(getTaskMetricStoreIndexes(taskMetricStore))
.containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 8));
Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
@@ -159,13 +164,26 @@ class MetricStoreTest {
currentExecutionAttempts);
store.updateCurrentExecutionAttempts(Collections.singleton(jobDetail));
- assertThat(
- store.getTaskMetricStore(JOB_ID.toString(), "taskid")
- .getAllSubtaskMetricStores()
- .keySet())
+ assertThat(taskMetricStore.getAllSubtaskMetricStores().keySet())
+
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(1));
+
+ assertThat(getTaskMetricStoreIndexes(taskMetricStore))
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(1));
}
+ @Nonnull
+ private static Set<Integer> getTaskMetricStoreIndexes(
+ MetricStore.TaskMetricStore taskMetricStore) {
+ return taskMetricStore.metrics.keySet().stream()
+ .map(
+ key -> {
+ String index = key.substring(0,
Math.max(key.indexOf('.'), 0));
+ return index.matches("\\d+") ?
Integer.parseInt(index) : null;
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ }
+
@Test
void testSubtaskMetricStoreCleanup() {
MetricStore store = setupStore(new MetricStore());