This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a6ef34392638bf724f020071de3d4f118ee753cf Author: Matthias Pohl <[email protected]> AuthorDate: Wed Dec 15 12:42:22 2021 +0100 [FLINK-25432][runtime] Makes JobManagerMetricGroup implement LocallyCleanableResource and GloballyCleanableResource --- .../org/apache/flink/runtime/dispatcher/Dispatcher.java | 10 +++++++++- .../runtime/metrics/groups/JobManagerMetricGroup.java | 14 +++++++++++--- .../flink/runtime/metrics/groups/JobManagerGroupTest.java | 5 +++-- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 92fe36b..6599115 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -902,7 +902,15 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher } private void cleanUpRemainingJobData(JobID jobId, boolean jobGraphRemoved) { - jobManagerMetricGroup.removeJob(jobId); + try { + jobManagerMetricGroup.globalCleanup(jobId); + } catch (Exception e) { + log.warn( + "Could not properly clean data for job {} stored in JobManager metric group", + jobId, + e); + } + if (jobGraphRemoved) { try { highAvailabilityServices.globalCleanup(jobId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java index 431dab4..8d630c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java @@ -20,12 +20,16 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; +import org.apache.flink.util.concurrent.FutureUtils; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Special {@link org.apache.flink.metrics.MetricGroup} representing a JobManager. @@ -33,7 +37,8 @@ import java.util.Map; * <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do not contain * tasks any more */ -public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetricGroup> { +public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetricGroup> + implements LocallyCleanableResource { private final Map<JobID, JobManagerJobMetricGroup> jobs = new HashMap<>(); @@ -84,9 +89,10 @@ public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetric } } - public void removeJob(JobID jobId) { + @Override + public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor ignoredExecutor) { if (jobId == null) { - return; + return FutureUtils.completedVoidFuture(); } synchronized (this) { @@ -95,6 +101,8 @@ public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetric containedGroup.close(); } } + + return FutureUtils.completedVoidFuture(); } public int numRegisteredJobMetricGroups() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java index 958c01a..4a06fa6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.metrics.MetricRegistryTestUtils; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.Executors; import org.junit.Test; @@ -62,12 +63,12 @@ public class JobManagerGroupTest extends TestLogger { assertEquals(2, group.numRegisteredJobMetricGroups()); - group.removeJob(jid1); + group.localCleanupAsync(jid1, Executors.directExecutor()).join(); assertTrue(jmJobGroup11.isClosed()); assertEquals(1, group.numRegisteredJobMetricGroups()); - group.removeJob(jid2); + group.localCleanupAsync(jid2, Executors.directExecutor()).join(); assertTrue(jmJobGroup21.isClosed()); assertEquals(0, group.numRegisteredJobMetricGroups());
