This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 8b8b24b2f86d8b79cb14ca0d703313dcc3ce7e8b
Author: Maximilian Michels <[email protected]>
AuthorDate: Wed May 10 13:33:55 2023 +0200

    [FLINK-32061] Clean up resource metric group on resource removal
---
 .../kubernetes/operator/autoscaler/JobAutoScalerImpl.java   |  4 +---
 .../operator/controller/FlinkDeploymentController.java      |  1 +
 .../operator/service/FlinkResourceContextFactory.java       | 13 ++++++++++++-
 .../operator/TestingFlinkResourceContextFactory.java        |  9 +++++++++
 .../operator/controller/FlinkDeploymentControllerTest.java  |  7 +++++++
 .../controller/TestingFlinkDeploymentController.java        |  8 ++++++--
 6 files changed, 36 insertions(+), 6 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index 186de397..d340b249 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -70,9 +70,7 @@ public class JobAutoScalerImpl implements JobAutoScaler {
         metricsCollector.cleanup(cr);
         var resourceId = ResourceID.fromResource(cr);
         lastEvaluatedMetrics.remove(resourceId);
-        // We are not removing the metrics registered with Flink (flinkMetrics)
-        // because Flink metrics can only be registered once! When the 
deployment
-        // comes back we would not be able to register and report metrics.
+        flinkMetrics.remove(resourceId);
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 16f1137e..4d40faa9 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -113,6 +113,7 @@ public class FlinkDeploymentController
             // ignore during cleanup
         }
         statusRecorder.removeCachedStatus(flinkApp);
+        ctxFactory.cleanup(flinkApp);
         return reconcilerFactory.getOrCreate(flinkApp).cleanup(ctx);
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
index ba3cb238..a20e46cf 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
@@ -50,7 +50,7 @@ public class FlinkResourceContextFactory {
     private final KubernetesOperatorMetricGroup operatorMetricGroup;
     private final Map<KubernetesDeploymentMode, FlinkService> serviceMap;
 
-    private final Map<Tuple2<Class<?>, ResourceID>, 
KubernetesResourceMetricGroup>
+    protected final Map<Tuple2<Class<?>, ResourceID>, 
KubernetesResourceMetricGroup>
             resourceMetricGroups = new ConcurrentHashMap<>();
 
     public FlinkResourceContextFactory(
@@ -118,4 +118,15 @@ public class FlinkResourceContextFactory {
     private KubernetesDeploymentMode getDeploymentMode(FlinkDeployment 
deployment) {
         return KubernetesDeploymentMode.getDeploymentMode(deployment);
     }
+
+    public <CR extends AbstractFlinkResource<?, ?>> void cleanup(CR flinkApp) {
+        var resourceMetricGroup =
+                resourceMetricGroups.remove(
+                        Tuple2.of(flinkApp.getClass(), 
ResourceID.fromResource(flinkApp)));
+        if (resourceMetricGroup != null) {
+            resourceMetricGroup.close();
+        } else {
+            LOG.warn("Unknown resource metric group for {}", flinkApp);
+        }
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkResourceContextFactory.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkResourceContextFactory.java
index 05bb290f..db082113 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkResourceContextFactory.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkResourceContextFactory.java
@@ -17,13 +17,18 @@
 
 package org.apache.flink.kubernetes.operator;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import 
org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
 import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+import java.util.Map;
 
 /** Flink service factory mock for tests. */
 public class TestingFlinkResourceContextFactory extends 
FlinkResourceContextFactory {
@@ -42,4 +47,8 @@ public class TestingFlinkResourceContextFactory extends 
FlinkResourceContextFact
     protected FlinkService getOrCreateFlinkService(FlinkDeployment deployment) 
{
         return flinkService;
     }
+
+    public Map<Tuple2<Class<?>, ResourceID>, KubernetesResourceMetricGroup> 
getMetricGroups() {
+        return resourceMetricGroups;
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 541da7aa..6ddc7ee7 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -991,8 +991,15 @@ public class FlinkDeploymentControllerTest {
     @Test
     public void cleanUpNewDeployment() {
         FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
+        var resourceMetricGroup =
+                testController
+                        .getContextFactory()
+                        .getResourceContext(flinkDeployment, context)
+                        .getResourceMetricGroup();
         var deleteControl = testController.cleanup(flinkDeployment, context);
         assertNotNull(deleteControl);
+        assertTrue(resourceMetricGroup.isClosed());
+        
assertTrue(testController.getContextFactory().getMetricGroups().isEmpty());
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 606ed23e..bdfa051b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -67,6 +67,9 @@ public class TestingFlinkDeploymentController
     private EventCollector eventCollector = new EventCollector();
 
     private EventRecorder eventRecorder;
+
+    @Getter private TestingFlinkResourceContextFactory contextFactory;
+
     private StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder;
     @Getter private CanaryResourceManager<FlinkDeployment> 
canaryResourceManager;
 
@@ -74,7 +77,8 @@ public class TestingFlinkDeploymentController
             FlinkConfigManager configManager,
             KubernetesClient kubernetesClient,
             TestingFlinkService flinkService) {
-        var ctxFactory =
+
+        contextFactory =
                 new TestingFlinkResourceContextFactory(
                         kubernetesClient,
                         configManager,
@@ -96,7 +100,7 @@ public class TestingFlinkDeploymentController
                 new FlinkDeploymentController(
                         configManager,
                         ValidatorUtils.discoverValidators(configManager),
-                        ctxFactory,
+                        contextFactory,
                         reconcilerFactory,
                         new FlinkDeploymentObserverFactory(configManager, 
eventRecorder),
                         statusRecorder,

Reply via email to