This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new 4dad2e29a09 [FLINK-29132][rest] Cleanup subtask attempt metrics
according to the JobDetails to avoid memory leak.
4dad2e29a09 is described below
commit 4dad2e29a090c731a0474a80e320264040c348ce
Author: Gen Luo <[email protected]>
AuthorDate: Thu Sep 1 17:08:53 2022 +0800
[FLINK-29132][rest] Cleanup subtask attempt metrics according to the
JobDetails to avoid memory leak.
This closes #20733.
---
.../runtime/messages/webmonitor/JobDetails.java | 96 ++++++++---------
.../handler/job/JobVertexBackPressureHandler.java | 34 +++---
.../rest/handler/legacy/metrics/MetricStore.java | 49 +++++++--
.../messages/webmonitor/JobDetailsTest.java | 31 ------
.../job/JobVertexBackPressureHandlerTest.java | 12 +--
.../handler/legacy/metrics/MetricStoreTest.java | 114 ++++++++++++++++-----
6 files changed, 198 insertions(+), 138 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index 74b2964228c..04fce9483c0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -40,8 +41,11 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -60,8 +64,6 @@ public class JobDetails implements Serializable {
private static final String FIELD_NAME_STATUS = "state";
private static final String FIELD_NAME_LAST_MODIFICATION =
"last-modification";
private static final String FIELD_NAME_TOTAL_NUMBER_TASKS = "total";
- private static final String FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS =
- "current-execution-attempts";
private final JobID jobId;
@@ -84,10 +86,12 @@ public class JobDetails implements Serializable {
/**
* The map holds the attempt number of the current execution attempt in
the Execution, which is
* considered as the representing execution for the subtask of the vertex.
The keys and values
- * are JobVertexID -> SubtaskIndex -> CurrentExecutionAttemptNumber. It is
used to accumulate
- * the metrics of a subtask in MetricFetcher.
+ * are JobVertexID -> SubtaskIndex -> CurrenAttempts info.
+ *
+ * <p>The field is excluded from the json. Any usage from the web UI and
the history server is
+ * not allowed.
*/
- private final Map<String, Map<Integer, Integer>> currentExecutionAttempts;
+ private final Map<String, Map<Integer, CurrentAttempts>>
currentExecutionAttempts;
@VisibleForTesting
public JobDetails(
@@ -123,7 +127,7 @@ public class JobDetails implements Serializable {
long lastUpdateTime,
int[] tasksPerState,
int numTasks,
- Map<String, Map<Integer, Integer>> currentExecutionAttempts) {
+ Map<String, Map<Integer, CurrentAttempts>>
currentExecutionAttempts) {
this.jobId = checkNotNull(jobId);
this.jobName = checkNotNull(jobName);
this.startTime = startTime;
@@ -150,22 +154,25 @@ public class JobDetails implements Serializable {
int[] countsPerStatus = new int[ExecutionState.values().length];
long lastChanged = 0;
int numTotalTasks = 0;
- Map<String, Map<Integer, Integer>> currentExecutionAttempts = new
HashMap<>();
+ Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
new HashMap<>();
for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) {
AccessExecutionVertex[] taskVertices = ejv.getTaskVertices();
numTotalTasks += taskVertices.length;
- Map<Integer, Integer> vertexAttempts = new HashMap<>();
+ Map<Integer, CurrentAttempts> vertexAttempts = new HashMap<>();
for (AccessExecutionVertex taskVertex : taskVertices) {
- if (taskVertex.getCurrentExecutions().size() > 1) {
- vertexAttempts.put(
- taskVertex.getParallelSubtaskIndex(),
-
taskVertex.getCurrentExecutionAttempt().getAttemptNumber());
- }
ExecutionState state = taskVertex.getExecutionState();
countsPerStatus[state.ordinal()]++;
lastChanged = Math.max(lastChanged,
taskVertex.getStateTimestamp(state));
+
+ vertexAttempts.put(
+ taskVertex.getParallelSubtaskIndex(),
+ new CurrentAttempts(
+
taskVertex.getCurrentExecutionAttempt().getAttemptNumber(),
+ taskVertex.getCurrentExecutions().stream()
+ .map(AccessExecution::getAttemptNumber)
+ .collect(Collectors.toSet())));
}
if (!vertexAttempts.isEmpty()) {
@@ -226,7 +233,7 @@ public class JobDetails implements Serializable {
return tasksPerState;
}
- public Map<String, Map<Integer, Integer>> getCurrentExecutionAttempts() {
+ public Map<String, Map<Integer, CurrentAttempts>>
getCurrentExecutionAttempts() {
return currentExecutionAttempts;
}
// ------------------------------------------------------------------------
@@ -326,20 +333,6 @@ public class JobDetails implements Serializable {
jsonGenerator.writeEndObject();
- if (!jobDetails.currentExecutionAttempts.isEmpty()) {
-
jsonGenerator.writeObjectFieldStart(FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS);
- for (Map.Entry<String, Map<Integer, Integer>> vertex :
- jobDetails.currentExecutionAttempts.entrySet()) {
- jsonGenerator.writeObjectFieldStart(vertex.getKey());
- for (Map.Entry<Integer, Integer> attempt :
vertex.getValue().entrySet()) {
- jsonGenerator.writeNumberField(
- String.valueOf(attempt.getKey()),
attempt.getValue());
- }
- jsonGenerator.writeEndObject();
- }
- jsonGenerator.writeEndObject();
- }
-
jsonGenerator.writeEndObject();
}
}
@@ -379,28 +372,6 @@ public class JobDetails implements Serializable {
jsonNode == null ? 0 : jsonNode.intValue();
}
- Map<String, Map<Integer, Integer>> attempts = new HashMap<>();
- JsonNode attemptsNode =
rootNode.get(FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS);
- if (attemptsNode != null) {
- attemptsNode
- .fields()
- .forEachRemaining(
- vertex -> {
- String vertexId = vertex.getKey();
- Map<Integer, Integer> vertexAttempts =
- attempts.computeIfAbsent(
- vertexId, k -> new
HashMap<>());
- vertex.getValue()
- .fields()
- .forEachRemaining(
- attempt ->
- vertexAttempts.put(
-
Integer.parseInt(
-
attempt.getKey()),
-
attempt.getValue().intValue()));
- });
- }
-
return new JobDetails(
jobId,
jobName,
@@ -411,7 +382,30 @@ public class JobDetails implements Serializable {
lastUpdateTime,
numVerticesPerExecutionState,
numTasks,
- attempts);
+ new HashMap<>());
+ }
+ }
+
+ /**
+ * The CurrentAttempts holds the attempt number of the current
representative execution attempt,
+ * and the attempt numbers of all the running attempts.
+ */
+ public static final class CurrentAttempts implements Serializable {
+ private final int representativeAttempt;
+
+ private final Set<Integer> currentAttempts;
+
+ public CurrentAttempts(int representativeAttempt, Set<Integer>
currentAttempts) {
+ this.representativeAttempt = representativeAttempt;
+ this.currentAttempts =
Collections.unmodifiableSet(currentAttempts);
+ }
+
+ public int getRepresentativeAttempt() {
+ return representativeAttempt;
+ }
+
+ public Set<Integer> getCurrentAttempts() {
+ return currentAttempts;
}
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
index 898b74ba0d0..57d3e4207cf 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
@@ -82,23 +82,23 @@ public class JobVertexBackPressureHandler
metricFetcher
.getMetricStore()
.getTaskMetricStore(jobId.toString(),
jobVertexId.toString());
- Map<String, Map<Integer, Integer>> jobCurrentExecutions =
-
metricFetcher.getMetricStore().getCurrentExecutionAttempts().get(jobId.toString());
- Map<Integer, Integer> currentExecutionAttempts =
- jobCurrentExecutions != null
- ? jobCurrentExecutions.get(jobVertexId.toString())
+ Map<String, Map<Integer, Integer>> jobRepresentativeExecutions =
+
metricFetcher.getMetricStore().getRepresentativeAttempts().get(jobId.toString());
+ Map<Integer, Integer> representativeAttempts =
+ jobRepresentativeExecutions != null
+ ?
jobRepresentativeExecutions.get(jobVertexId.toString())
: null;
return CompletableFuture.completedFuture(
taskMetricStore != null
- ? createJobVertexBackPressureInfo(taskMetricStore,
currentExecutionAttempts)
+ ? createJobVertexBackPressureInfo(taskMetricStore,
representativeAttempts)
: JobVertexBackPressureInfo.deprecated());
}
private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
- TaskMetricStore taskMetricStore, Map<Integer, Integer>
currentExecutionAttempts) {
+ TaskMetricStore taskMetricStore, Map<Integer, Integer>
representativeAttempts) {
List<SubtaskBackPressureInfo> subtaskBackPressureInfos =
- createSubtaskBackPressureInfo(taskMetricStore,
currentExecutionAttempts);
+ createSubtaskBackPressureInfo(taskMetricStore,
representativeAttempts);
return new JobVertexBackPressureInfo(
JobVertexBackPressureInfo.VertexBackPressureStatus.OK,
getBackPressureLevel(getMaxBackPressureRatio(subtaskBackPressureInfos)),
@@ -107,7 +107,7 @@ public class JobVertexBackPressureHandler
}
private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
- TaskMetricStore taskMetricStore, Map<Integer, Integer>
currentExecutionAttempts) {
+ TaskMetricStore taskMetricStore, Map<Integer, Integer>
representativeAttempts) {
Map<Integer, SubtaskMetricStore> subtaskMetricStores =
taskMetricStore.getAllSubtaskMetricStores();
List<SubtaskBackPressureInfo> result = new
ArrayList<>(subtaskMetricStores.size());
@@ -121,19 +121,19 @@ public class JobVertexBackPressureHandler
createSubtaskAttemptBackpressureInfo(
subtaskIndex, null, subtaskMetricStore, null));
} else {
- int currentAttempt =
- currentExecutionAttempts == null
+ int representativeAttempt =
+ representativeAttempts == null
? -1
- :
currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
- if (!allAttemptsMetricStores.containsKey(currentAttempt)) {
+ :
representativeAttempts.getOrDefault(subtaskIndex, -1);
+ if
(!allAttemptsMetricStores.containsKey(representativeAttempt)) {
// allAttemptsMetricStores is not empty here
- currentAttempt =
allAttemptsMetricStores.keySet().iterator().next();
+ representativeAttempt =
allAttemptsMetricStores.keySet().iterator().next();
}
List<SubtaskBackPressureInfo> otherConcurrentAttempts =
new ArrayList<>(allAttemptsMetricStores.size() - 1);
for (Map.Entry<Integer, ComponentMetricStore> attemptStore :
allAttemptsMetricStores.entrySet()) {
- if (attemptStore.getKey() == currentAttempt) {
+ if (attemptStore.getKey() == representativeAttempt) {
continue;
}
otherConcurrentAttempts.add(
@@ -146,8 +146,8 @@ public class JobVertexBackPressureHandler
result.add(
createSubtaskAttemptBackpressureInfo(
subtaskIndex,
- currentAttempt,
- allAttemptsMetricStores.get(currentAttempt),
+ representativeAttempt,
+
allAttemptsMetricStores.get(representativeAttempt),
otherConcurrentAttempts));
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index c86ac763f9c..1908afa7427 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails.CurrentAttempts;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
@@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -63,7 +65,7 @@ public class MetricStore {
* CurrentExecutionAttemptNumber. When a metric of an execution attempt is
added, the metric can
* also be added to the SubtaskMetricStore when it is of the representing
execution.
*/
- private final Map<String, Map<String, Map<Integer, Integer>>>
currentExecutionAttempts =
+ private final Map<String, Map<String, Map<Integer, Integer>>>
representativeAttempts =
new ConcurrentHashMap<>();
/**
@@ -82,18 +84,39 @@ public class MetricStore {
*/
synchronized void retainJobs(List<String> activeJobs) {
jobs.keySet().retainAll(activeJobs);
- currentExecutionAttempts.keySet().retainAll(activeJobs);
+ representativeAttempts.keySet().retainAll(activeJobs);
}
public synchronized void
updateCurrentExecutionAttempts(Collection<JobDetails> jobs) {
- jobs.forEach(
- job ->
- currentExecutionAttempts.put(
- job.getJobId().toString(),
job.getCurrentExecutionAttempts()));
+ for (JobDetails job : jobs) {
+ String jobId = job.getJobId().toString();
+ Map<String, Map<Integer, CurrentAttempts>> currentAttempts =
+ job.getCurrentExecutionAttempts();
+ Map<String, Map<Integer, Integer>> jobRepresentativeAttempts =
+ representativeAttempts.compute(
+ jobId, (k, overwritten) -> new
HashMap<>(currentAttempts.size()));
+ currentAttempts.forEach(
+ (vertexId, subtaskAttempts) -> {
+ Map<Integer, Integer> vertexAttempts =
+ jobRepresentativeAttempts.compute(
+ vertexId, (k, overwritten) -> new
HashMap<>());
+ TaskMetricStore taskMetricStore =
getTaskMetricStore(jobId, vertexId);
+ subtaskAttempts.forEach(
+ (subtaskIndex, attempts) -> {
+ // Updates representative attempts
+ vertexAttempts.put(
+ subtaskIndex,
attempts.getRepresentativeAttempt());
+ // Retains current attempt metrics to
avoid memory leak
+ taskMetricStore
+
.getSubtaskMetricStore(subtaskIndex)
+
.retainAttempts(attempts.getCurrentAttempts());
+ });
+ });
+ }
}
- public Map<String, Map<String, Map<Integer, Integer>>>
getCurrentExecutionAttempts() {
- return currentExecutionAttempts;
+ public Map<String, Map<String, Map<Integer, Integer>>>
getRepresentativeAttempts() {
+ return representativeAttempts;
}
/**
@@ -341,7 +364,7 @@ public class MetricStore {
// which means there should be only one execution
private boolean isRepresentativeAttempt(
String jobID, String vertexID, int subtaskIndex, int
attemptNumber) {
- return Optional.of(currentExecutionAttempts)
+ return Optional.of(representativeAttempts)
.map(m -> m.get(jobID))
.map(m -> m.get(vertexID))
.map(m -> m.get(subtaskIndex))
@@ -507,6 +530,14 @@ public class MetricStore {
return unmodifiableMap(attempts);
}
+ void retainAttempts(Set<Integer> currentAttempts) {
+ int latestAttempt = currentAttempts.stream().mapToInt(i ->
i).max().orElse(0);
+ attempts.keySet()
+ .removeIf(
+ attempt ->
+ attempt < latestAttempt &&
!currentAttempts.contains(attempt));
+ }
+
private static SubtaskMetricStore unmodifiable(SubtaskMetricStore
source) {
if (source == null) {
return null;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
index 609ef1f8e0d..790ca43ce7d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
@@ -29,8 +29,6 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.junit.jupiter.api.Test;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@@ -104,33 +102,4 @@ class JobDetailsTest {
assertThat(unmarshalled).isEqualTo(expected);
}
-
- @Test
- void testJobDetailsWithExecutionAttemptsMarshalling() throws
JsonProcessingException {
- Map<String, Map<Integer, Integer>> currentExecutionAttempts = new
HashMap<>();
- currentExecutionAttempts.computeIfAbsent("a", k -> new
HashMap<>()).put(1, 2);
- currentExecutionAttempts.computeIfAbsent("a", k -> new
HashMap<>()).put(2, 4);
- currentExecutionAttempts.computeIfAbsent("b", k -> new
HashMap<>()).put(3, 1);
-
- final JobDetails expected =
- new JobDetails(
- new JobID(),
- "foobar",
- 1L,
- 10L,
- 9L,
- JobStatus.RUNNING,
- 8L,
- new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 3},
- 42,
- currentExecutionAttempts);
-
- final ObjectMapper objectMapper =
RestMapperUtils.getStrictObjectMapper();
-
- final JsonNode marshalled = objectMapper.valueToTree(expected);
-
- final JobDetails unmarshalled = objectMapper.treeToValue(marshalled,
JobDetails.class);
-
- assertThat(unmarshalled).isEqualTo(expected);
- }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
index 93714132681..d218d9ffd55 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
@@ -285,16 +285,16 @@ class JobVertexBackPressureHandlerTest {
for (MetricDump metricDump : getMultipleAttemptsMetricDumps()) {
multipleAttemptsMetricStore.add(metricDump);
}
- // Update currentExecutionAttempts directly without JobDetails.
- Map<Integer, Integer> currentExecutionAttempts = new HashMap<>();
- currentExecutionAttempts.put(0, 1);
- currentExecutionAttempts.put(1, 0);
+ // Update representativeAttempts directly without JobDetails.
+ Map<Integer, Integer> representativeAttempts = new HashMap<>();
+ representativeAttempts.put(0, 1);
+ representativeAttempts.put(1, 0);
multipleAttemptsMetricStore
- .getCurrentExecutionAttempts()
+ .getRepresentativeAttempts()
.put(
TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
Collections.singletonMap(
- TEST_JOB_VERTEX_ID.toString(),
currentExecutionAttempts));
+ TEST_JOB_VERTEX_ID.toString(),
representativeAttempts));
JobVertexBackPressureHandler jobVertexBackPressureHandler =
new JobVertexBackPressureHandler(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index c54f50291fc..84e5cd6396e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -18,21 +18,30 @@
package org.apache.flink.runtime.rest.handler.legacy.metrics;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails.CurrentAttempts;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the MetricStore. */
class MetricStoreTest {
+ private static final JobID JOB_ID = new JobID();
+
@Test
void testAdd() throws IOException {
MetricStore store = setupStore(new MetricStore());
@@ -40,49 +49,55 @@ class MetricStoreTest {
assertThat(store.getJobManagerMetricStore().getMetric("abc.metric1",
"-1")).isEqualTo("0");
assertThat(store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2",
"-1"))
.isEqualTo("1");
- assertThat(store.getJobMetricStore("jobid").getMetric("abc.metric3",
"-1")).isEqualTo("2");
- assertThat(store.getJobMetricStore("jobid").getMetric("abc.metric4",
"-1")).isEqualTo("3");
+
assertThat(store.getJobMetricStore(JOB_ID.toString()).getMetric("abc.metric3",
"-1"))
+ .isEqualTo("2");
+
assertThat(store.getJobMetricStore(JOB_ID.toString()).getMetric("abc.metric4",
"-1"))
+ .isEqualTo("3");
- assertThat(store.getTaskMetricStore("jobid",
"taskid").getMetric("8.abc.metric5", "-1"))
+ assertThat(
+ store.getTaskMetricStore(JOB_ID.toString(), "taskid")
+ .getMetric("8.abc.metric5", "-1"))
.isEqualTo("14");
- assertThat(store.getSubtaskMetricStore("jobid", "taskid",
8).getMetric("abc.metric5", "-1"))
+ assertThat(
+ store.getSubtaskMetricStore(JOB_ID.toString(),
"taskid", 8)
+ .getMetric("abc.metric5", "-1"))
.isEqualTo("14");
assertThat(
- store.getSubtaskAttemptMetricStore("jobid", "taskid",
8, 1)
+ store.getSubtaskAttemptMetricStore(JOB_ID.toString(),
"taskid", 8, 1)
.getMetric("abc.metric5", "-1"))
.isEqualTo("4");
assertThat(
- store.getSubtaskAttemptMetricStore("jobid", "taskid",
8, 2)
+ store.getSubtaskAttemptMetricStore(JOB_ID.toString(),
"taskid", 8, 2)
.getMetric("abc.metric5", "-1"))
.isEqualTo("14");
assertThat(
- store.getTaskMetricStore("jobid", "taskid")
+ store.getTaskMetricStore(JOB_ID.toString(), "taskid")
.getMetric("8.opname.abc.metric6", "-1"))
.isEqualTo("5");
assertThat(
- store.getTaskMetricStore("jobid", "taskid")
+ store.getTaskMetricStore(JOB_ID.toString(), "taskid")
.getMetric("8.opname.abc.metric7", "-1"))
.isEqualTo("6");
assertThat(
- store.getTaskMetricStore("jobid", "taskid")
+ store.getTaskMetricStore(JOB_ID.toString(), "taskid")
.getMetric("1.opname.abc.metric7", "-1"))
.isEqualTo("6");
assertThat(
- store.getSubtaskMetricStore("jobid", "taskid", 1)
+ store.getSubtaskMetricStore(JOB_ID.toString(),
"taskid", 1)
.getMetric("opname.abc.metric7", "-1"))
.isEqualTo("6");
- assertThat(store.getSubtaskAttemptMetricStore("jobid", "taskid", 1,
2)).isNull();
+ assertThat(store.getSubtaskAttemptMetricStore(JOB_ID.toString(),
"taskid", 1, 2)).isNull();
assertThat(
- store.getSubtaskAttemptMetricStore("jobid", "taskid",
1, 3)
+ store.getSubtaskAttemptMetricStore(JOB_ID.toString(),
"taskid", 1, 3)
.getMetric("opname.abc.metric7", "-1"))
.isEqualTo("6");
assertThat(
- store.getSubtaskAttemptMetricStore("jobid", "taskid",
8, 2)
+ store.getSubtaskAttemptMetricStore(JOB_ID.toString(),
"taskid", 8, 2)
.getMetric("opname.abc.metric7", "-1"))
.isEqualTo("6");
assertThat(
- store.getSubtaskAttemptMetricStore("jobid", "taskid",
8, 4)
+ store.getSubtaskAttemptMetricStore(JOB_ID.toString(),
"taskid", 8, 4)
.getMetric("opname.abc.metric7", "-1"))
.isEqualTo("16");
}
@@ -106,11 +121,48 @@ class MetricStoreTest {
assertThat(store.getJobs()).isEmpty();
}
+ @Test
+ void testSubtaskMetricStoreCleanup() {
+ MetricStore store = setupStore(new MetricStore());
+ assertThat(
+ store.getTaskMetricStore(JOB_ID.toString(), "taskid")
+ .getSubtaskMetricStore(8)
+ .getAllAttemptsMetricStores()
+ .keySet())
+ .containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 2, 4,
5));
+
+ Set<Integer> currentAttempts = new HashSet<>(Arrays.asList(1, 4));
+ Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
+ Collections.singletonMap(
+ "taskid",
+ Collections.singletonMap(8, new CurrentAttempts(1,
currentAttempts)));
+ JobDetails jobDetail =
+ new JobDetails(
+ JOB_ID,
+ "jobname",
+ 0,
+ 0,
+ 0,
+ JobStatus.RUNNING,
+ 0,
+ new int[10],
+ 1,
+ currentExecutionAttempts);
+ store.updateCurrentExecutionAttempts(Collections.singleton(jobDetail));
+
+ assertThat(
+ store.getTaskMetricStore(JOB_ID.toString(), "taskid")
+ .getSubtaskMetricStore(8)
+ .getAllAttemptsMetricStores()
+ .keySet())
+ .containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 4, 5));
+ }
+
static MetricStore setupStore(MetricStore store) {
- Map<Integer, Integer> currentExecutionAttempts = new HashMap<>();
- currentExecutionAttempts.put(8, 2);
- store.getCurrentExecutionAttempts()
- .put("jobid", Collections.singletonMap("taskid",
currentExecutionAttempts));
+ Map<Integer, Integer> representativeAttempts = new HashMap<>();
+ representativeAttempts.put(8, 2);
+ store.getRepresentativeAttempts()
+ .put(JOB_ID.toString(), Collections.singletonMap("taskid",
representativeAttempts));
QueryScopeInfo.JobManagerQueryScopeInfo jm =
new QueryScopeInfo.JobManagerQueryScopeInfo("abc");
@@ -126,7 +178,8 @@ class MetricStoreTest {
MetricDump.CounterDump cd22 = new MetricDump.CounterDump(tm2,
"metric2", 10);
MetricDump.CounterDump cd22a = new MetricDump.CounterDump(tm2,
"metric2b", 10);
- QueryScopeInfo.JobQueryScopeInfo job = new
QueryScopeInfo.JobQueryScopeInfo("jobid", "abc");
+ QueryScopeInfo.JobQueryScopeInfo job =
+ new QueryScopeInfo.JobQueryScopeInfo(JOB_ID.toString(), "abc");
MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job,
"metric3", 2);
MetricDump.CounterDump cd4 = new MetricDump.CounterDump(job,
"metric4", 3);
@@ -136,30 +189,41 @@ class MetricStoreTest {
MetricDump.CounterDump cd42 = new MetricDump.CounterDump(job2,
"metric4", 3);
QueryScopeInfo.TaskQueryScopeInfo task =
- new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, 1,
"abc");
+ new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(),
"taskid", 8, 1, "abc");
MetricDump.CounterDump cd5 = new MetricDump.CounterDump(task,
"metric5", 4);
QueryScopeInfo.TaskQueryScopeInfo speculativeTask =
- new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, 2,
"abc");
+ new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(),
"taskid", 8, 2, "abc");
MetricDump.CounterDump cd52 = new
MetricDump.CounterDump(speculativeTask, "metric5", 14);
QueryScopeInfo.OperatorQueryScopeInfo operator =
- new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid",
8, 2, "opname", "abc");
+ new QueryScopeInfo.OperatorQueryScopeInfo(
+ JOB_ID.toString(), "taskid", 8, 2, "opname", "abc");
MetricDump.CounterDump cd6 = new MetricDump.CounterDump(operator,
"metric6", 5);
MetricDump.CounterDump cd7 = new MetricDump.CounterDump(operator,
"metric7", 6);
QueryScopeInfo.OperatorQueryScopeInfo operator2 =
- new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid",
1, 3, "opname", "abc");
+ new QueryScopeInfo.OperatorQueryScopeInfo(
+ JOB_ID.toString(), "taskid", 1, 3, "opname", "abc");
MetricDump.CounterDump cd62 = new MetricDump.CounterDump(operator2,
"metric6", 5);
MetricDump.CounterDump cd72 = new MetricDump.CounterDump(operator2,
"metric7", 6);
QueryScopeInfo.OperatorQueryScopeInfo speculativeOperator2 =
- new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid",
8, 4, "opname", "abc");
+ new QueryScopeInfo.OperatorQueryScopeInfo(
+ JOB_ID.toString(), "taskid", 8, 4, "opname", "abc");
MetricDump.CounterDump cd63 =
new MetricDump.CounterDump(speculativeOperator2, "metric6",
15);
MetricDump.CounterDump cd73 =
new MetricDump.CounterDump(speculativeOperator2, "metric7",
16);
+ QueryScopeInfo.OperatorQueryScopeInfo speculativeOperator3 =
+ new QueryScopeInfo.OperatorQueryScopeInfo(
+ JOB_ID.toString(), "taskid", 8, 5, "opname", "abc");
+ MetricDump.CounterDump cd64 =
+ new MetricDump.CounterDump(speculativeOperator3, "metric6",
17);
+ MetricDump.CounterDump cd74 =
+ new MetricDump.CounterDump(speculativeOperator3, "metric7",
18);
+
store.add(cd1);
store.add(cd2);
store.add(cd2a);
@@ -179,6 +243,8 @@ class MetricStoreTest {
store.add(cd52);
store.add(cd63);
store.add(cd73);
+ store.add(cd64);
+ store.add(cd74);
return store;
}