This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 8b8b24b2f86d8b79cb14ca0d703313dcc3ce7e8b Author: Maximilian Michels <[email protected]> AuthorDate: Wed May 10 13:33:55 2023 +0200 [FLINK-32061] Clean up resource metric group on resource removal --- .../kubernetes/operator/autoscaler/JobAutoScalerImpl.java | 4 +--- .../operator/controller/FlinkDeploymentController.java | 1 + .../operator/service/FlinkResourceContextFactory.java | 13 ++++++++++++- .../operator/TestingFlinkResourceContextFactory.java | 9 +++++++++ .../operator/controller/FlinkDeploymentControllerTest.java | 7 +++++++ .../controller/TestingFlinkDeploymentController.java | 8 ++++++-- 6 files changed, 36 insertions(+), 6 deletions(-) diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java index 186de397..d340b249 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java @@ -70,9 +70,7 @@ public class JobAutoScalerImpl implements JobAutoScaler { metricsCollector.cleanup(cr); var resourceId = ResourceID.fromResource(cr); lastEvaluatedMetrics.remove(resourceId); - // We are not removing the metrics registered with Flink (flinkMetrics) - // because Flink metrics can only be registered once! When the deployment - // comes back we would not be able to register and report metrics. + flinkMetrics.remove(resourceId); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 16f1137e..4d40faa9 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -113,6 +113,7 @@ public class FlinkDeploymentController // ignore during cleanup } statusRecorder.removeCachedStatus(flinkApp); + ctxFactory.cleanup(flinkApp); return reconcilerFactory.getOrCreate(flinkApp).cleanup(ctx); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java index ba3cb238..a20e46cf 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java @@ -50,7 +50,7 @@ public class FlinkResourceContextFactory { private final KubernetesOperatorMetricGroup operatorMetricGroup; private final Map<KubernetesDeploymentMode, FlinkService> serviceMap; - private final Map<Tuple2<Class<?>, ResourceID>, KubernetesResourceMetricGroup> + protected final Map<Tuple2<Class<?>, ResourceID>, KubernetesResourceMetricGroup> resourceMetricGroups = new ConcurrentHashMap<>(); public FlinkResourceContextFactory( @@ -118,4 +118,15 @@ public class FlinkResourceContextFactory { private KubernetesDeploymentMode getDeploymentMode(FlinkDeployment deployment) { return KubernetesDeploymentMode.getDeploymentMode(deployment); } + + public <CR extends AbstractFlinkResource<?, ?>> void cleanup(CR flinkApp) { + var resourceMetricGroup = + resourceMetricGroups.remove( + Tuple2.of(flinkApp.getClass(), ResourceID.fromResource(flinkApp))); + if (resourceMetricGroup != null) { + resourceMetricGroup.close(); + } else { + LOG.warn("Unknown resource metric group for {}", flinkApp); + } + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkResourceContextFactory.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkResourceContextFactory.java index 05bb290f..db082113 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkResourceContextFactory.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkResourceContextFactory.java @@ -17,13 +17,18 @@ package org.apache.flink.kubernetes.operator; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup; +import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup; import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory; import org.apache.flink.kubernetes.operator.service.FlinkService; import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +import java.util.Map; /** Flink service factory mock for tests. */ public class TestingFlinkResourceContextFactory extends FlinkResourceContextFactory { @@ -42,4 +47,8 @@ public class TestingFlinkResourceContextFactory extends FlinkResourceContextFact protected FlinkService getOrCreateFlinkService(FlinkDeployment deployment) { return flinkService; } + + public Map<Tuple2<Class<?>, ResourceID>, KubernetesResourceMetricGroup> getMetricGroups() { + return resourceMetricGroups; + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 541da7aa..6ddc7ee7 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -991,8 +991,15 @@ public class FlinkDeploymentControllerTest { @Test public void cleanUpNewDeployment() { FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster(); + var resourceMetricGroup = + testController + .getContextFactory() + .getResourceContext(flinkDeployment, context) + .getResourceMetricGroup(); var deleteControl = testController.cleanup(flinkDeployment, context); assertNotNull(deleteControl); + assertTrue(resourceMetricGroup.isClosed()); + assertTrue(testController.getContextFactory().getMetricGroups().isEmpty()); } @Test diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java index 606ed23e..bdfa051b 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java @@ -67,6 +67,9 @@ public class TestingFlinkDeploymentController private EventCollector eventCollector = new EventCollector(); private EventRecorder eventRecorder; + + @Getter private TestingFlinkResourceContextFactory contextFactory; + private StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder; @Getter private CanaryResourceManager<FlinkDeployment> canaryResourceManager; @@ -74,7 +77,8 @@ public class TestingFlinkDeploymentController FlinkConfigManager configManager, KubernetesClient kubernetesClient, TestingFlinkService flinkService) { - var ctxFactory = + + contextFactory = new TestingFlinkResourceContextFactory( kubernetesClient, configManager, @@ -96,7 +100,7 @@ public class TestingFlinkDeploymentController new FlinkDeploymentController( configManager, ValidatorUtils.discoverValidators(configManager), - ctxFactory, + contextFactory, reconcilerFactory, new FlinkDeploymentObserverFactory(configManager, eventRecorder), statusRecorder,
