This is an automated email from the ASF dual-hosted git repository.
wanglijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 024e970e277 [FLINK-31650][metrics][rest] Remove transient metrics for
subtasks in terminal state (#23988)
024e970e277 is described below
commit 024e970e27781a9fd1925d6c5938efbb364c6462
Author: Zhanghao Chen <[email protected]>
AuthorDate: Tue Dec 26 09:53:02 2023 +0800
[FLINK-31650][metrics][rest] Remove transient metrics for subtasks in
terminal state (#23988)
This closes #23447
(cherry picked from commit dd028282e8ab19c0d1cd05fad02b63bbda6c1358)
---
.../runtime/messages/webmonitor/JobDetails.java | 16 +++++--
.../rest/handler/legacy/metrics/MetricStore.java | 50 ++++++++++++++++++++++
.../handler/legacy/metrics/MetricStoreTest.java | 45 +++++++++++++++++--
3 files changed, 105 insertions(+), 6 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index ed53e2014fd..be6657f4bee 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -194,7 +194,8 @@ public class JobDetails implements Serializable {
taskVertex.getCurrentExecutionAttempt().getAttemptNumber(),
taskVertex.getCurrentExecutions().stream()
.map(AccessExecution::getAttemptNumber)
- .collect(Collectors.toSet())));
+ .collect(Collectors.toSet()),
+ state.isTerminal()));
}
if (!vertexAttempts.isEmpty()) {
@@ -355,16 +356,21 @@ public class JobDetails implements Serializable {
/**
* The CurrentAttempts holds the attempt number of the current
representative execution attempt,
- * and the attempt numbers of all the running attempts.
+ * the attempt numbers of all the running attempts, and whether the
current execution has
+ * reached terminal state.
*/
public static final class CurrentAttempts implements Serializable {
private final int representativeAttempt;
private final Set<Integer> currentAttempts;
- public CurrentAttempts(int representativeAttempt, Set<Integer>
currentAttempts) {
+ private final boolean isTerminalState;
+
+ public CurrentAttempts(
+ int representativeAttempt, Set<Integer> currentAttempts,
boolean isTerminalState) {
this.representativeAttempt = representativeAttempt;
this.currentAttempts =
Collections.unmodifiableSet(currentAttempts);
+ this.isTerminalState = isTerminalState;
}
public int getRepresentativeAttempt() {
@@ -374,5 +380,9 @@ public class JobDetails implements Serializable {
public Set<Integer> getCurrentAttempts() {
return currentAttempts;
}
+
+ public boolean isTerminalState() {
+ return isTerminalState;
+ }
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 965cff49fe9..8e9d771e11e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobDetails.CurrentAttempts;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
@@ -29,8 +30,10 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -55,6 +58,18 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
public class MetricStore {
private static final Logger LOG =
LoggerFactory.getLogger(MetricStore.class);
+ /**
+ * The set holds the names of the transient metrics which are no longer
useful after a subtask
+ * reaches terminal state and shall be removed to avoid misleading users.
Note that there may be
+ * other transient metrics, we currently only support cleaning these three.
+ */
+ private static final Set<String> TRANSIENT_METRIC_NAMES =
+ new HashSet<>(
+ Arrays.asList(
+ MetricNames.TASK_IDLE_TIME,
+ MetricNames.TASK_BACK_PRESSURED_TIME,
+ MetricNames.TASK_BUSY_TIME));
+
private final ComponentMetricStore jobManager = new ComponentMetricStore();
private final Map<String, TaskManagerMetricStore> taskManagers = new
ConcurrentHashMap<>();
private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
@@ -124,6 +139,13 @@ public class MetricStore {
subtaskMetricStore ->
subtaskMetricStore.retainAttempts(
attempts.getCurrentAttempts()));
+ // Remove transient metrics for terminal
subtasks
+ if (attempts.isTerminalState()) {
+ taskMetricStoreOptional.ifPresent(
+ taskMetricStore ->
+
taskMetricStore.removeTransientMetrics(
+ subtaskIndex));
+ }
});
});
}
@@ -417,6 +439,11 @@ public class MetricStore {
}
}
+ private static boolean isTransientMetric(String fullMetricName) {
+ String metricName =
fullMetricName.substring(fullMetricName.lastIndexOf('.') + 1);
+ return TRANSIENT_METRIC_NAMES.contains(metricName);
+ }
+
//
-----------------------------------------------------------------------------------------------------------------
// sub MetricStore classes
//
-----------------------------------------------------------------------------------------------------------------
@@ -529,6 +556,19 @@ public class MetricStore {
subtasks.keySet().retainAll(activeSubtasks);
}
+ void removeTransientMetrics(int subtaskIndex) {
+ if (subtasks.containsKey(subtaskIndex)) {
+ // Remove in both places as task metrics are duplicated in
task metric store and
+ // subtask metric store.
+ metrics.keySet()
+ .removeIf(
+ key ->
+ key.startsWith(subtaskIndex + ".")
+ && isTransientMetric(key));
+ subtasks.get(subtaskIndex).removeTransientMetrics();
+ }
+ }
+
private static TaskMetricStore unmodifiable(TaskMetricStore source) {
if (source == null) {
return null;
@@ -569,6 +609,16 @@ public class MetricStore {
attempt < latestAttempt &&
!currentAttempts.contains(attempt));
}
+ void removeTransientMetrics() {
+ attempts.values()
+ .forEach(
+ attempt ->
+ attempt.metrics
+ .keySet()
+
.removeIf(MetricStore::isTransientMetric));
+ metrics.keySet().removeIf(MetricStore::isTransientMetric);
+ }
+
private static SubtaskMetricStore unmodifiable(SubtaskMetricStore
source) {
if (source == null) {
return null;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index 95ac5167ef6..6bcf3e7bc64 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobDetails.CurrentAttempts;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
@@ -144,7 +145,7 @@ class MetricStoreTest {
Collections.singletonMap(
"taskid",
Collections.singletonMap(
- 1, new CurrentAttempts(1, new
HashSet<>()))));
+ 1, new CurrentAttempts(1, new
HashSet<>(), false))));
assertThatCode(
() ->
metricStore.updateCurrentExecutionAttempts(
@@ -165,7 +166,8 @@ class MetricStoreTest {
Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
Collections.singletonMap(
"taskid",
- Collections.singletonMap(1, new CurrentAttempts(1, new
HashSet<>())));
+ Collections.singletonMap(
+ 1, new CurrentAttempts(1, new HashSet<>(),
false)));
JobDetails jobDetail =
new JobDetails(
JOB_ID,
@@ -187,6 +189,38 @@ class MetricStoreTest {
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(1));
}
+ @Test
+ void testRemoveTransientMetricsForTerminalSubtasks() {
+ MetricStore store = setupStore(new MetricStore());
+ MetricStore.TaskMetricStore taskMetricStore =
+ store.getTaskMetricStore(JOB_ID.toString(), "taskid");
+
+ Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
+ Collections.singletonMap(
+ "taskid",
+ Collections.singletonMap(8, new CurrentAttempts(2, new
HashSet<>(), true)));
+ JobDetails jobDetail =
+ new JobDetails(
+ JOB_ID,
+ "jobname",
+ 0,
+ 0,
+ 0,
+ JobStatus.RUNNING,
+ 0,
+ new int[10],
+ 8,
+ currentExecutionAttempts);
+
+ MetricStore.SubtaskMetricStore subtaskMetricStore =
+ taskMetricStore.getSubtaskMetricStore(8);
+ assertThat(taskMetricStore.getMetric("8.abc." +
MetricNames.TASK_BUSY_TIME)).isNotNull();
+ assertThat(subtaskMetricStore.getMetric("abc." +
MetricNames.TASK_BUSY_TIME)).isNotNull();
+ store.updateCurrentExecutionAttempts(Collections.singleton(jobDetail));
+ assertThat(taskMetricStore.getMetric("8.abc." +
MetricNames.TASK_BUSY_TIME)).isNull();
+ assertThat(subtaskMetricStore.getMetric("abc." +
MetricNames.TASK_BUSY_TIME)).isNull();
+ }
+
@Nonnull
private static Set<Integer> getTaskMetricStoreIndexes(
MetricStore.TaskMetricStore taskMetricStore) {
@@ -214,7 +248,8 @@ class MetricStoreTest {
Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
Collections.singletonMap(
"taskid",
- Collections.singletonMap(8, new CurrentAttempts(1,
currentAttempts)));
+ Collections.singletonMap(
+ 8, new CurrentAttempts(1, currentAttempts,
false)));
JobDetails jobDetail =
new JobDetails(
JOB_ID,
@@ -274,6 +309,8 @@ class MetricStoreTest {
QueryScopeInfo.TaskQueryScopeInfo speculativeTask =
new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(),
"taskid", 8, 2, "abc");
MetricDump.CounterDump cd52 = new
MetricDump.CounterDump(speculativeTask, "metric5", 14);
+ MetricDump.CounterDump cd52BusyTime =
+ new MetricDump.CounterDump(speculativeTask,
MetricNames.TASK_BUSY_TIME, 400);
QueryScopeInfo.OperatorQueryScopeInfo operator =
new QueryScopeInfo.OperatorQueryScopeInfo(
@@ -325,6 +362,8 @@ class MetricStoreTest {
store.add(cd64);
store.add(cd74);
+ store.add(cd52BusyTime);
+
return store;
}
}