This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d3b08f0 [FLINK-28592] Custom resource counter metrics improvements
4d3b08f0 is described below

commit 4d3b08f0a890c84e2a5131e08efd8925bc41ddad
Author: Matyas Orhidi <[email protected]>
AuthorDate: Sat Jul 23 18:20:57 2022 +0200

    [FLINK-28592] Custom resource counter metrics improvements
---
 .../kubernetes_operator_metric_configuration.html  |   6 +
 .../flink/kubernetes/operator/FlinkOperator.java   |  14 +-
 .../controller/FlinkDeploymentController.java      |   4 +-
 .../controller/FlinkSessionJobController.java      |   4 +-
 .../operator/metrics/CustomResourceMetrics.java    |   2 +-
 .../operator/metrics/FlinkDeploymentMetrics.java   |  82 ++++++---
 .../operator/metrics/FlinkSessionJobMetrics.java   |  41 ++++-
 .../metrics/KubernetesOperatorMetricOptions.java   |   7 +
 .../kubernetes/operator/metrics/MetricManager.java | 100 ++++++-----
 .../metrics/lifecycle/LifecycleMetrics.java        |  12 +-
 .../operator/observer/SavepointObserver.java       |   7 +-
 .../observer/deployment/ApplicationObserver.java   |   4 +-
 .../observer/deployment/ObserverFactory.java       |   4 +-
 .../observer/sessionjob/SessionJobObserver.java    |   4 +-
 .../operator/reconciler/ReconciliationUtils.java   |   2 +-
 .../AbstractFlinkResourceReconciler.java           |   4 +-
 .../deployment/AbstractJobReconciler.java          |   2 +-
 .../deployment/ApplicationReconciler.java          |   2 +-
 .../reconciler/deployment/ReconcilerFactory.java   |   4 +-
 .../reconciler/deployment/SessionReconciler.java   |   2 +-
 .../sessionjob/SessionJobReconciler.java           |   2 +-
 .../kubernetes/operator/utils/StatusRecorder.java  |  34 ++--
 .../flink/kubernetes/operator/TestUtils.java       |  66 ++++---
 .../kubernetes/operator/TestingStatusRecorder.java |  10 +-
 .../TestingFlinkDeploymentController.java          |   7 +-
 .../listener/FlinkResourceListenerTest.java        |   9 +-
 .../metrics/FlinkDeploymentMetricsTest.java        | 193 ++++++++++++++-------
 .../metrics/FlinkSessionJobMetricsTest.java        | 111 ++++++++++--
 .../operator/metrics/TestingMetricListener.java    |  91 ++++++++++
 .../lifecycle/ResourceLifecycleMetricsTest.java    |  43 +++--
 .../sessionjob/SessionJobObserverTest.java         |   3 +-
 .../deployment/ApplicationReconcilerTest.java      |   2 +-
 .../sessionjob/SessionJobReconcilerTest.java       |   2 +-
 .../operator/utils/StatusRecorderTest.java         |   9 +-
 34 files changed, 617 insertions(+), 272 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
index d5d23294..74a02191 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
@@ -44,6 +44,12 @@
             <td>Boolean</td>
             <td>In addition to the system level histograms, enable per 
namespace tracking of state and transition times.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.resource.metrics.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Enables metrics for FlinkDeployment and FlinkSessionJob custom 
resources.</td>
+        </tr>
         <tr>
             <td><h5>metrics.scope.k8soperator.resource</h5></td>
             <td style="word-wrap: 
break-word;">"&lt;host&gt;.k8soperator.&lt;namespace&gt;.&lt;name&gt;.resource.&lt;resourcens&gt;.&lt;resourcename&gt;"</td>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 4d2f4674..c0e8841d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -25,8 +25,6 @@ import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import 
org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
 import org.apache.flink.kubernetes.operator.listener.ListenerUtils;
 import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
@@ -120,9 +118,9 @@ public class FlinkOperator {
 
     @VisibleForTesting
     void registerDeploymentController() {
-        var statusRecorder =
-                StatusRecorder.<FlinkDeploymentStatus>create(
-                        client, new MetricManager<>(metricGroup, 
configManager), listeners);
+        var metricManager =
+                
MetricManager.createFlinkDeploymentMetricManager(configManager, metricGroup);
+        var statusRecorder = StatusRecorder.create(client, metricManager, 
listeners);
         var eventRecorder = EventRecorder.create(client, listeners);
         var reconcilerFactory =
                 new ReconcilerFactory(
@@ -145,9 +143,9 @@ public class FlinkOperator {
     @VisibleForTesting
     void registerSessionJobController() {
         var eventRecorder = EventRecorder.create(client, listeners);
-        var statusRecorder =
-                StatusRecorder.<FlinkSessionJobStatus>create(
-                        client, new MetricManager<>(metricGroup, 
configManager), listeners);
+        var metricManager =
+                
MetricManager.createFlinkSessionJobMetricManager(configManager, metricGroup);
+        var statusRecorder = StatusRecorder.create(client, metricManager, 
listeners);
         var reconciler =
                 new SessionJobReconciler(
                         client, flinkServiceFactory, configManager, 
eventRecorder, statusRecorder);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 1bd59468..dae1f64f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -64,7 +64,7 @@ public class FlinkDeploymentController
     private final Set<FlinkResourceValidator> validators;
     private final ReconcilerFactory reconcilerFactory;
     private final ObserverFactory observerFactory;
-    private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
+    private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder;
     private final EventRecorder eventRecorder;
 
     public FlinkDeploymentController(
@@ -72,7 +72,7 @@ public class FlinkDeploymentController
             Set<FlinkResourceValidator> validators,
             ReconcilerFactory reconcilerFactory,
             ObserverFactory observerFactory,
-            StatusRecorder<FlinkDeploymentStatus> statusRecorder,
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder,
             EventRecorder eventRecorder) {
         this.configManager = configManager;
         this.validators = validators;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index 3727af40..aa4c1450 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -60,14 +60,14 @@ public class FlinkSessionJobController
     private final Set<FlinkResourceValidator> validators;
     private final Reconciler<FlinkSessionJob> reconciler;
     private final Observer<FlinkSessionJob> observer;
-    private final StatusRecorder<FlinkSessionJobStatus> statusRecorder;
+    private final StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> 
statusRecorder;
 
     public FlinkSessionJobController(
             FlinkConfigManager configManager,
             Set<FlinkResourceValidator> validators,
             Reconciler<FlinkSessionJob> reconciler,
             Observer<FlinkSessionJob> observer,
-            StatusRecorder<FlinkSessionJobStatus> statusRecorder) {
+            StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> 
statusRecorder) {
         this.configManager = configManager;
         this.validators = validators;
         this.reconciler = reconciler;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/CustomResourceMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/CustomResourceMetrics.java
index 512db41e..08a7704e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/CustomResourceMetrics.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/CustomResourceMetrics.java
@@ -20,7 +20,7 @@ package org.apache.flink.kubernetes.operator.metrics;
 import io.fabric8.kubernetes.client.CustomResource;
 
 /** Custom resource metric type. */
-public interface CustomResourceMetrics<CR extends CustomResource> {
+public interface CustomResourceMetrics<CR extends CustomResource<?, ?>> {
     void onUpdate(CR customResource);
 
     void onRemove(CR customResource);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
index 8e40175b..0c8f3826 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
@@ -17,12 +17,10 @@
 
 package org.apache.flink.kubernetes.operator.metrics;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.metrics.MetricGroup;
 
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -30,40 +28,68 @@ import java.util.concurrent.ConcurrentHashMap;
 /** FlinkDeployment metrics. */
 public class FlinkDeploymentMetrics implements 
CustomResourceMetrics<FlinkDeployment> {
 
-    private final Map<JobManagerDeploymentStatus, Set<String>> statuses = new 
HashMap<>();
-    public static final String FLINK_DEPLOYMENT_GROUP_NAME = "FlinkDeployment";
-    public static final String JM_DEPLOYMENT_STATUS_GROUP_NAME = 
"JmDeploymentStatus";
+    private final KubernetesOperatorMetricGroup parentMetricGroup;
+    private final Configuration configuration;
+    private final Map<String, Map<JobManagerDeploymentStatus, Set<String>>> 
deployments =
+            new ConcurrentHashMap<>();
+    public static final String DEPLOYMENT_GROUP_NAME = 
FlinkDeployment.class.getSimpleName();
+    public static final String STATUS_GROUP_NAME = "JmDeploymentStatus";
     public static final String COUNTER_NAME = "Count";
 
-    public FlinkDeploymentMetrics(MetricGroup parentMetricGroup) {
-        MetricGroup flinkDeploymentMetrics =
-                parentMetricGroup.addGroup(FLINK_DEPLOYMENT_GROUP_NAME);
-        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
-            statuses.put(status, ConcurrentHashMap.newKeySet());
-        }
-        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
-            statuses.put(status, new HashSet<>());
-            MetricGroup metricGroup =
-                    flinkDeploymentMetrics
-                            .addGroup(JM_DEPLOYMENT_STATUS_GROUP_NAME)
-                            .addGroup(status.toString());
-            metricGroup.gauge(COUNTER_NAME, () -> statuses.get(status).size());
-        }
-        flinkDeploymentMetrics.gauge(
-                COUNTER_NAME, () -> 
statuses.values().stream().mapToInt(Set::size).sum());
+    public FlinkDeploymentMetrics(
+            KubernetesOperatorMetricGroup parentMetricGroup, Configuration 
configuration) {
+        this.parentMetricGroup = parentMetricGroup;
+        this.configuration = configuration;
     }
 
     public void onUpdate(FlinkDeployment flinkApp) {
         onRemove(flinkApp);
-        statuses.get(flinkApp.getStatus().getJobManagerDeploymentStatus())
+        deployments
+                .computeIfAbsent(
+                        flinkApp.getMetadata().getNamespace(),
+                        ns -> {
+                            initNamespaceDeploymentCounts(ns);
+                            initNamespaceStatusCounts(ns);
+                            return createDeploymentStatusMap();
+                        })
+                .get(flinkApp.getStatus().getJobManagerDeploymentStatus())
                 .add(flinkApp.getMetadata().getName());
     }
 
     public void onRemove(FlinkDeployment flinkApp) {
-        statuses.values()
-                .forEach(
-                        deployments -> {
-                            
deployments.remove(flinkApp.getMetadata().getName());
-                        });
+        if (!deployments.containsKey(flinkApp.getMetadata().getNamespace())) {
+            return;
+        }
+        deployments
+                .get(flinkApp.getMetadata().getNamespace())
+                .values()
+                .forEach(names -> 
names.remove(flinkApp.getMetadata().getName()));
+    }
+
+    private void initNamespaceDeploymentCounts(String ns) {
+        parentMetricGroup
+                .createResourceNamespaceGroup(configuration, ns)
+                .addGroup(DEPLOYMENT_GROUP_NAME)
+                .gauge(
+                        COUNTER_NAME,
+                        () -> 
deployments.get(ns).values().stream().mapToInt(Set::size).sum());
+    }
+
+    private void initNamespaceStatusCounts(String ns) {
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            parentMetricGroup
+                    .createResourceNamespaceGroup(configuration, ns)
+                    .addGroup(STATUS_GROUP_NAME)
+                    .addGroup(status.toString())
+                    .gauge(COUNTER_NAME, () -> 
deployments.get(ns).get(status).size());
+        }
+    }
+
+    private Map<JobManagerDeploymentStatus, Set<String>> 
createDeploymentStatusMap() {
+        Map<JobManagerDeploymentStatus, Set<String>> statuses = new 
ConcurrentHashMap<>();
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            statuses.put(status, ConcurrentHashMap.newKeySet());
+        }
+        return statuses;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
index 492ed0d2..1d099519 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
@@ -17,28 +17,53 @@
 
 package org.apache.flink.kubernetes.operator.metrics;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
-import org.apache.flink.metrics.MetricGroup;
 
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /** FlinkSessionJob metrics. */
 public class FlinkSessionJobMetrics implements 
CustomResourceMetrics<FlinkSessionJob> {
 
-    private final Set<String> sessionJobs = ConcurrentHashMap.newKeySet();
-    public static final String METRIC_GROUP_NAME = "FlinkSessionJob";
+    private final KubernetesOperatorMetricGroup parentMetricGroup;
+    private final Configuration configuration;
+    private final Map<String, Set<String>> sessionJobs = new 
ConcurrentHashMap<>();
+    public static final String METRIC_GROUP_NAME = 
FlinkSessionJob.class.getSimpleName();
+    public static final String COUNTER_NAME = "Count";
 
-    public FlinkSessionJobMetrics(MetricGroup parentMetricGroup) {
-        MetricGroup flinkSessionJobMetrics = 
parentMetricGroup.addGroup(METRIC_GROUP_NAME);
-        flinkSessionJobMetrics.gauge("Count", () -> sessionJobs.size());
+    public FlinkSessionJobMetrics(
+            KubernetesOperatorMetricGroup parentMetricGroup, Configuration 
configuration) {
+        this.parentMetricGroup = parentMetricGroup;
+        this.configuration = configuration;
     }
 
     public void onUpdate(FlinkSessionJob sessionJob) {
-        sessionJobs.add(sessionJob.getMetadata().getName());
+        onRemove(sessionJob);
+        sessionJobs
+                .computeIfAbsent(
+                        sessionJob.getMetadata().getNamespace(),
+                        ns -> {
+                            initNamespaceSessionJobCounts(ns);
+                            return ConcurrentHashMap.newKeySet();
+                        })
+                .add(sessionJob.getMetadata().getName());
     }
 
     public void onRemove(FlinkSessionJob sessionJob) {
-        sessionJobs.remove(sessionJob.getMetadata().getName());
+        if (!sessionJobs.containsKey(sessionJob.getMetadata().getNamespace())) 
{
+            return;
+        }
+        sessionJobs
+                .get(sessionJob.getMetadata().getNamespace())
+                .remove(sessionJob.getMetadata().getName());
+    }
+
+    private void initNamespaceSessionJobCounts(String ns) {
+        parentMetricGroup
+                .createResourceNamespaceGroup(configuration, ns)
+                .addGroup(METRIC_GROUP_NAME)
+                .gauge(COUNTER_NAME, () -> sessionJobs.get(ns).size());
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index faba69d0..ae014b9f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -43,6 +43,13 @@ public class KubernetesOperatorMetricOptions {
                     .withDescription(
                             "Enable KubernetesClient metrics for measuring the 
HTTP traffic to the Kubernetes API Server.");
 
+    public static final ConfigOption<Boolean> 
OPERATOR_RESOURCE_METRICS_ENABLED =
+            ConfigOptions.key("kubernetes.operator.resource.metrics.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Enables metrics for FlinkDeployment and 
FlinkSessionJob custom resources.");
+
     public static final ConfigOption<Boolean> 
OPERATOR_LIFECYCLE_METRICS_ENABLED =
             
ConfigOptions.key("kubernetes.operator.resource.lifecycle.metrics.enabled")
                     .booleanType()
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
index 307019a7..490dd03e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
@@ -24,67 +24,81 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.metrics.lifecycle.LifecycleMetrics;
 
-import java.time.Clock;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.ArrayList;
+import java.util.List;
 
 /** Metric manager for Operator managed custom resources. */
 public class MetricManager<CR extends AbstractFlinkResource<?, ?>> {
-    private final KubernetesOperatorMetricGroup opMetricGroup;
-    private final FlinkConfigManager configManager;
-    private final Map<String, CustomResourceMetrics> metrics = new 
ConcurrentHashMap<>();
+    private final List<CustomResourceMetrics<CR>> registeredMetrics = new 
ArrayList<>();
 
-    private final LifecycleMetrics<CR> lifeCycleMetrics;
+    public void onUpdate(CR cr) {
+        registeredMetrics.forEach(m -> m.onUpdate(cr));
+    }
 
-    public MetricManager(
-            KubernetesOperatorMetricGroup opMetricGroup, FlinkConfigManager 
configManager) {
-        this.opMetricGroup = opMetricGroup;
-        this.configManager = configManager;
+    public void onRemove(CR cr) {
+        registeredMetrics.forEach(m -> m.onRemove(cr));
+    }
 
-        if (configManager
-                .getDefaultConfig()
-                
.get(KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED)) {
-            this.lifeCycleMetrics =
-                    new LifecycleMetrics<>(configManager, 
Clock.systemDefaultZone(), opMetricGroup);
-        } else {
-            this.lifeCycleMetrics = null;
-        }
+    public void register(CustomResourceMetrics<CR> metrics) {
+        registeredMetrics.add(metrics);
     }
 
-    public void onUpdate(CR cr) {
-        getCustomResourceMetrics(cr).onUpdate(cr);
-        if (lifeCycleMetrics != null) {
-            lifeCycleMetrics.onUpdate(cr);
-        }
+    public static MetricManager<FlinkDeployment> 
createFlinkDeploymentMetricManager(
+            FlinkConfigManager configManager, KubernetesOperatorMetricGroup 
metricGroup) {
+        MetricManager<FlinkDeployment> metricManager = new MetricManager<>();
+        registerFlinkDeploymentMetrics(configManager, metricGroup, 
metricManager);
+        registerLifecycleMetrics(configManager, metricGroup, metricManager);
+        return metricManager;
     }
 
-    public void onRemove(CR cr) {
-        getCustomResourceMetrics(cr).onRemove(cr);
-        if (lifeCycleMetrics != null) {
-            lifeCycleMetrics.onRemove(cr);
+    public static MetricManager<FlinkSessionJob> 
createFlinkSessionJobMetricManager(
+            FlinkConfigManager configManager, KubernetesOperatorMetricGroup 
metricGroup) {
+        MetricManager<FlinkSessionJob> metricManager = new MetricManager<>();
+        registerFlinkSessionJobMetrics(configManager, metricGroup, 
metricManager);
+        registerLifecycleMetrics(configManager, metricGroup, metricManager);
+        return metricManager;
+    }
+
+    private static void registerFlinkDeploymentMetrics(
+            FlinkConfigManager configManager,
+            KubernetesOperatorMetricGroup metricGroup,
+            MetricManager<FlinkDeployment> metricManager) {
+        if (configManager
+                .getDefaultConfig()
+                
.get(KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED)) {
+            metricManager.register(
+                    new FlinkDeploymentMetrics(metricGroup, 
configManager.getDefaultConfig()));
         }
     }
 
-    private CustomResourceMetrics getCustomResourceMetrics(CR cr) {
-        return metrics.computeIfAbsent(
-                cr.getMetadata().getNamespace(), k -> 
getCustomResourceMetricsImpl(cr));
+    private static void registerFlinkSessionJobMetrics(
+            FlinkConfigManager configManager,
+            KubernetesOperatorMetricGroup metricGroup,
+            MetricManager<FlinkSessionJob> metricManager) {
+        if (configManager
+                .getDefaultConfig()
+                
.get(KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED)) {
+            metricManager.register(
+                    new FlinkSessionJobMetrics(metricGroup, 
configManager.getDefaultConfig()));
+        }
     }
 
-    private CustomResourceMetrics getCustomResourceMetricsImpl(CR cr) {
-        var namespaceMg =
-                opMetricGroup.createResourceNamespaceGroup(
-                        configManager.getDefaultConfig(), 
cr.getMetadata().getNamespace());
-        if (cr instanceof FlinkDeployment) {
-            return new FlinkDeploymentMetrics(namespaceMg);
-        } else if (cr instanceof FlinkSessionJob) {
-            return new FlinkSessionJobMetrics(namespaceMg);
-        } else {
-            throw new IllegalArgumentException("Unknown CustomResource");
+    private static <CR extends AbstractFlinkResource<?, ?>> void 
registerLifecycleMetrics(
+            FlinkConfigManager configManager,
+            KubernetesOperatorMetricGroup metricGroup,
+            MetricManager<CR> metricManager) {
+        if (configManager
+                        .getDefaultConfig()
+                        
.get(KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED)
+                && configManager
+                        .getDefaultConfig()
+                        
.get(KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED)) {
+            metricManager.register(new LifecycleMetrics<>(configManager, 
metricGroup));
         }
     }
 
     @VisibleForTesting
-    public LifecycleMetrics<CR> getLifeCycleMetrics() {
-        return lifeCycleMetrics;
+    public List<CustomResourceMetrics<CR>> getRegisteredMetrics() {
+        return registeredMetrics;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
index c2ef91eb..ad76b02a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
 import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
@@ -53,7 +54,8 @@ import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLif
  *
  * @param <CR> Flink resource type.
  */
-public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>> {
+public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
+        implements CustomResourceMetrics<CR> {
 
     private static final String TRANSITION_RESUME = "Resume";
     private static final String TRANSITION_UPGRADE = "Upgrade";
@@ -79,11 +81,9 @@ public class LifecycleMetrics<CR extends 
AbstractFlinkResource<?, ?>> {
     private Function<MetricGroup, MetricGroup> metricGroupFunction;
 
     public LifecycleMetrics(
-            FlinkConfigManager configManager,
-            Clock clock,
-            KubernetesOperatorMetricGroup operatorMetricGroup) {
+            FlinkConfigManager configManager, KubernetesOperatorMetricGroup 
operatorMetricGroup) {
         this.configManager = configManager;
-        this.clock = clock;
+        this.clock = Clock.systemDefaultZone();
         this.operatorMetricGroup = operatorMetricGroup;
         this.namespaceHistosEnabled =
                 configManager
@@ -93,10 +93,12 @@ public class LifecycleMetrics<CR extends 
AbstractFlinkResource<?, ?>> {
                                         
.OPERATOR_LIFECYCLE_NAMESPACE_HISTOGRAMS_ENABLED);
     }
 
+    @Override
     public void onUpdate(CR cr) {
         
getLifecycleMetricTracker(cr).onUpdate(cr.getStatus().getLifecycleState(), 
clock.instant());
     }
 
+    @Override
     public void onRemove(CR cr) {
         lifecycleTrackers.remove(
                 Tuple2.of(cr.getMetadata().getNamespace(), 
cr.getMetadata().getName()));
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
index aa3590a4..52c322ee 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
@@ -42,19 +42,20 @@ import java.util.Iterator;
 import java.util.List;
 
 /** An observer of savepoint progress. */
-public class SavepointObserver<STATUS extends CommonStatus<?>> {
+public class SavepointObserver<
+        CR extends AbstractFlinkResource<?, STATUS>, STATUS extends 
CommonStatus<?>> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SavepointObserver.class);
 
     private final FlinkService flinkService;
     private final FlinkConfigManager configManager;
-    private final StatusRecorder<STATUS> statusRecorder;
+    private final StatusRecorder<CR, STATUS> statusRecorder;
     private final EventRecorder eventRecorder;
 
     public SavepointObserver(
             FlinkService flinkService,
             FlinkConfigManager configManager,
-            StatusRecorder<STATUS> statusRecorder,
+            StatusRecorder<CR, STATUS> statusRecorder,
             EventRecorder eventRecorder) {
         this.flinkService = flinkService;
         this.configManager = configManager;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index 01641699..038e0342 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -38,13 +38,13 @@ import java.util.Optional;
 /** The observer of {@link 
org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
 public class ApplicationObserver extends AbstractDeploymentObserver {
 
-    private final SavepointObserver<FlinkDeploymentStatus> savepointObserver;
+    private final SavepointObserver<FlinkDeployment, FlinkDeploymentStatus> 
savepointObserver;
     private final JobStatusObserver<ApplicationObserverContext> 
jobStatusObserver;
 
     public ApplicationObserver(
             FlinkService flinkService,
             FlinkConfigManager configManager,
-            StatusRecorder<FlinkDeploymentStatus> statusRecorder,
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder,
             EventRecorder eventRecorder) {
         super(flinkService, configManager, eventRecorder);
         this.savepointObserver =
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
index b41d34b8..1f1617b2 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
@@ -36,7 +36,7 @@ public class ObserverFactory {
 
     private final FlinkServiceFactory flinkServiceFactory;
     private final FlinkConfigManager configManager;
-    private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
+    private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder;
     private final EventRecorder eventRecorder;
     private final Map<Tuple2<Mode, KubernetesDeploymentMode>, 
Observer<FlinkDeployment>>
             observerMap;
@@ -44,7 +44,7 @@ public class ObserverFactory {
     public ObserverFactory(
             FlinkServiceFactory flinkServiceFactory,
             FlinkConfigManager configManager,
-            StatusRecorder<FlinkDeploymentStatus> statusRecorder,
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder,
             EventRecorder eventRecorder) {
         this.flinkServiceFactory = flinkServiceFactory;
         this.configManager = configManager;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index c3aeec46..14f76492 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -56,12 +56,12 @@ public class SessionJobObserver implements 
Observer<FlinkSessionJob> {
     private final FlinkServiceFactory flinkServiceFactory;
     private final FlinkConfigManager configManager;
     private final EventRecorder eventRecorder;
-    private final StatusRecorder<FlinkSessionJobStatus> statusRecorder;
+    private final StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> 
statusRecorder;
 
     public SessionJobObserver(
             FlinkServiceFactory flinkServiceFactory,
             FlinkConfigManager configManager,
-            StatusRecorder<FlinkSessionJobStatus> statusRecorder,
+            StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> 
statusRecorder,
             EventRecorder eventRecorder) {
         this.flinkServiceFactory = flinkServiceFactory;
         this.configManager = configManager;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 79ee2dad..1e596fe5 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -399,7 +399,7 @@ public class ReconciliationUtils {
                     R resource,
                     Optional<RetryInfo> retryInfo,
                     Exception e,
-                    StatusRecorder<STATUS> statusRecorder) {
+                    StatusRecorder<R, STATUS> statusRecorder) {
 
         retryInfo.ifPresent(
                 r -> {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 3d458432..f2eb5efb 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -63,7 +63,7 @@ public abstract class AbstractFlinkResourceReconciler<
 
     protected final FlinkConfigManager configManager;
     protected final EventRecorder eventRecorder;
-    protected final StatusRecorder<STATUS> statusRecorder;
+    protected final StatusRecorder<CR, STATUS> statusRecorder;
     protected final KubernetesClient kubernetesClient;
 
     public static final String MSG_SUSPENDED = "Suspending existing 
deployment.";
@@ -75,7 +75,7 @@ public abstract class AbstractFlinkResourceReconciler<
             KubernetesClient kubernetesClient,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<STATUS> statusRecorder) {
+            StatusRecorder<CR, STATUS> statusRecorder) {
         this.kubernetesClient = kubernetesClient;
         this.configManager = configManager;
         this.eventRecorder = eventRecorder;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index e3a72a05..5d47ebbc 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -55,7 +55,7 @@ public abstract class AbstractJobReconciler<
             KubernetesClient kubernetesClient,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<STATUS> statusRecorder) {
+            StatusRecorder<CR, STATUS> statusRecorder) {
         super(kubernetesClient, configManager, eventRecorder, statusRecorder);
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index ec0389e2..e7c5143a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -60,7 +60,7 @@ public class ApplicationReconciler
             FlinkService flinkService,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkDeploymentStatus> statusRecorder) {
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder) {
         super(kubernetesClient, configManager, eventRecorder, statusRecorder);
         this.flinkService = flinkService;
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index 2d11f495..3192a954 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -40,7 +40,7 @@ public class ReconcilerFactory {
     private final FlinkServiceFactory flinkServiceFactory;
     private final FlinkConfigManager configManager;
     private final EventRecorder eventRecorder;
-    private final StatusRecorder<FlinkDeploymentStatus> 
deploymentStatusRecorder;
+    private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
deploymentStatusRecorder;
     private final Map<Tuple2<Mode, KubernetesDeploymentMode>, 
Reconciler<FlinkDeployment>>
             reconcilerMap;
 
@@ -49,7 +49,7 @@ public class ReconcilerFactory {
             FlinkServiceFactory flinkServiceFactory,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkDeploymentStatus> deploymentStatusRecorder) {
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
deploymentStatusRecorder) {
         this.kubernetesClient = kubernetesClient;
         this.flinkServiceFactory = flinkServiceFactory;
         this.configManager = configManager;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 527cb8b5..9b2f48e1 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -60,7 +60,7 @@ public class SessionReconciler
             FlinkService flinkService,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkDeploymentStatus> statusRecorder) {
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder) {
         super(kubernetesClient, configManager, eventRecorder, statusRecorder);
         this.flinkService = flinkService;
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index f0dd2520..1ff4f76c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -53,7 +53,7 @@ public class SessionJobReconciler
             FlinkServiceFactory flinkServiceFactory,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkSessionJobStatus> statusRecorder) {
+            StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> 
statusRecorder) {
         super(kubernetesClient, configManager, eventRecorder, statusRecorder);
         this.flinkServiceFactory = flinkServiceFactory;
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index 4b6fbdb7..47e34806 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -40,7 +40,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiConsumer;
 
 /** Helper class for status management and updates. */
-public class StatusRecorder<STATUS extends CommonStatus<?>> {
+public class StatusRecorder<
+        CR extends AbstractFlinkResource<?, STATUS>, STATUS extends 
CommonStatus<?>> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StatusRecorder.class);
 
@@ -50,13 +51,13 @@ public class StatusRecorder<STATUS extends CommonStatus<?>> 
{
             new ConcurrentHashMap<>();
 
     private final KubernetesClient client;
-    private final MetricManager<AbstractFlinkResource<?, STATUS>> 
metricManager;
-    private final BiConsumer<AbstractFlinkResource<?, STATUS>, STATUS> 
statusUpdateListener;
+    private final MetricManager<CR> metricManager;
+    private final BiConsumer<CR, STATUS> statusUpdateListener;
 
     public StatusRecorder(
             KubernetesClient client,
-            MetricManager<AbstractFlinkResource<?, STATUS>> metricManager,
-            BiConsumer<AbstractFlinkResource<?, STATUS>, STATUS> 
statusUpdateListener) {
+            MetricManager<CR> metricManager,
+            BiConsumer<CR, STATUS> statusUpdateListener) {
         this.client = client;
         this.statusUpdateListener = statusUpdateListener;
         this.metricManager = metricManager;
@@ -69,11 +70,10 @@ public class StatusRecorder<STATUS extends CommonStatus<?>> 
{
      * operator behavior.
      *
      * @param resource Resource for which status update should be performed
-     * @param <T> Resource type.
      */
     @SneakyThrows
-    public <T extends AbstractFlinkResource<?, STATUS>> void 
patchAndCacheStatus(T resource) {
-        Class<T> resourceClass = (Class<T>) resource.getClass();
+    public void patchAndCacheStatus(CR resource) {
+        Class<CR> resourceClass = (Class<CR>) resource.getClass();
         String namespace = resource.getMetadata().getNamespace();
         String name = resource.getMetadata().getName();
 
@@ -126,9 +126,8 @@ public class StatusRecorder<STATUS extends CommonStatus<?>> 
{
      * reconciles a resource for the first time after a restart.
      *
      * @param resource Resource for which the status should be updated from 
the cache
-     * @param <T> Custom resource type.
      */
-    public <T extends AbstractFlinkResource<?, STATUS>> void 
updateStatusFromCache(T resource) {
+    public void updateStatusFromCache(CR resource) {
         var key = getKey(resource);
         var cachedStatus = statusCache.get(key);
         if (cachedStatus != null) {
@@ -147,9 +146,8 @@ public class StatusRecorder<STATUS extends CommonStatus<?>> 
{
      * Remove cached status for Flink resource.
      *
      * @param resource Flink resource.
-     * @param <T> Resource type.
      */
-    public <T extends AbstractFlinkResource<?, STATUS>> void 
removeCachedStatus(T resource) {
+    public void removeCachedStatus(CR resource) {
         statusCache.remove(getKey(resource));
         metricManager.onRemove(resource);
     }
@@ -158,11 +156,12 @@ public class StatusRecorder<STATUS extends 
CommonStatus<?>> {
         return Tuple2.of(resource.getMetadata().getNamespace(), 
resource.getMetadata().getName());
     }
 
-    public static <S extends CommonStatus<?>> StatusRecorder<S> create(
-            KubernetesClient kubernetesClient,
-            MetricManager<AbstractFlinkResource<?, S>> metricManager,
-            Collection<FlinkResourceListener> listeners) {
-        BiConsumer<AbstractFlinkResource<?, S>, S> consumer =
+    public static <S extends CommonStatus<?>, CR extends 
AbstractFlinkResource<?, S>>
+            StatusRecorder<CR, S> create(
+                    KubernetesClient kubernetesClient,
+                    MetricManager<CR> metricManager,
+                    Collection<FlinkResourceListener> listeners) {
+        BiConsumer<CR, S> consumer =
                 (resource, previousStatus) -> {
                     var ctx =
                             new FlinkResourceListener.StatusUpdateContext() {
@@ -191,6 +190,7 @@ public class StatusRecorder<STATUS extends CommonStatus<?>> 
{
                                 }
                             });
                 };
+
         return new StatusRecorder<>(kubernetesClient, metricManager, consumer);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index ebe8abc3..9c9687b3 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -22,8 +22,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
@@ -40,7 +38,6 @@ import 
org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 
@@ -109,20 +106,38 @@ public class TestUtils {
     }
 
     public static FlinkDeployment buildSessionCluster(FlinkVersion version) {
+        return buildSessionCluster(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE, 
version);
+    }
+
+    public static FlinkDeployment buildSessionCluster(
+            String name, String namespace, FlinkVersion version) {
         FlinkDeployment deployment = new FlinkDeployment();
         deployment.setStatus(new FlinkDeploymentStatus());
         deployment.setMetadata(
                 new ObjectMetaBuilder()
-                        .withName(TEST_DEPLOYMENT_NAME)
-                        .withNamespace(TEST_NAMESPACE)
+                        .withName(name)
+                        .withNamespace(namespace)
                         .withCreationTimestamp(Instant.now().toString())
                         .build());
         deployment.setSpec(getTestFlinkDeploymentSpec(version));
         return deployment;
     }
 
+    public static FlinkDeployment buildApplicationCluster() {
+        return buildApplicationCluster(FlinkVersion.v1_15);
+    }
+
+    public static FlinkDeployment buildApplicationCluster(String name, String 
namespace) {
+        return buildApplicationCluster(name, namespace, FlinkVersion.v1_15);
+    }
+
     public static FlinkDeployment buildApplicationCluster(FlinkVersion 
version) {
-        FlinkDeployment deployment = buildSessionCluster(version);
+        return buildApplicationCluster(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE, 
version);
+    }
+
+    public static FlinkDeployment buildApplicationCluster(
+            String name, String namespace, FlinkVersion version) {
+        FlinkDeployment deployment = buildSessionCluster(name, namespace, 
version);
         deployment
                 .getSpec()
                 .setJob(
@@ -136,17 +151,13 @@ public class TestUtils {
         return deployment;
     }
 
-    public static FlinkDeployment buildApplicationCluster() {
-        return buildApplicationCluster(FlinkVersion.v1_15);
-    }
-
-    public static FlinkSessionJob buildSessionJob() {
+    public static FlinkSessionJob buildSessionJob(String name, String 
namespace) {
         FlinkSessionJob sessionJob = new FlinkSessionJob();
         sessionJob.setStatus(new FlinkSessionJobStatus());
         sessionJob.setMetadata(
                 new ObjectMetaBuilder()
-                        .withName(TEST_SESSION_JOB_NAME)
-                        .withNamespace(TEST_NAMESPACE)
+                        .withName(name)
+                        .withNamespace(namespace)
                         .withCreationTimestamp(Instant.now().toString())
                         .withUid(UUID.randomUUID().toString())
                         .withGeneration(1L)
@@ -165,6 +176,10 @@ public class TestUtils {
         return sessionJob;
     }
 
+    public static FlinkSessionJob buildSessionJob() {
+        return buildSessionJob(TEST_SESSION_JOB_NAME, TEST_NAMESPACE);
+    }
+
     public static FlinkDeploymentSpec getTestFlinkDeploymentSpec(FlinkVersion 
version) {
         Map<String, String> conf = new HashMap<>();
         conf.put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2");
@@ -431,21 +446,8 @@ public class TestUtils {
         }
     }
 
-    public static <T extends AbstractFlinkResource<?, ?>> MetricManager<T> 
createTestMetricManager(
-            Configuration conf) {
-        return createTestMetricManager(
-                TestingMetricRegistry.builder()
-                        .setDelimiter(".".charAt(0))
-                        .setRegisterConsumer((metric, name, group) -> {})
-                        .build(),
-                conf);
-    }
-
-    public static <T extends AbstractFlinkResource<?, ?>> MetricManager<T> 
createTestMetricManager(
-            MetricRegistry metricRegistry, Configuration conf) {
-
-        var confManager = new FlinkConfigManager(conf);
-        return new MetricManager<>(createTestMetricGroup(metricRegistry, 
conf), confManager);
+    public static KubernetesOperatorMetricGroup 
createTestMetricGroup(Configuration conf) {
+        return createTestMetricGroup(createTestMetricRegistry(), conf);
     }
 
     public static KubernetesOperatorMetricGroup createTestMetricGroup(
@@ -454,6 +456,14 @@ public class TestUtils {
                 metricRegistry, conf, TEST_NAMESPACE, "testopname", 
"testhost");
     }
 
+    public static TestingMetricRegistry createTestMetricRegistry() {
+
+        return TestingMetricRegistry.builder()
+                .setDelimiter(".".charAt(0))
+                .setRegisterConsumer((metric, name, group) -> {})
+                .build();
+    }
+
     /** Testing ResponseProvider. */
     public static class ValidatingResponseProvider<T> implements 
ResponseProvider<Object> {
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
index 5e9cfcca..be893b58 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
@@ -18,22 +18,24 @@
 
 package org.apache.flink.kubernetes.operator;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /** Testing statusRecorder. */
-public class TestingStatusRecorder<STATUS extends CommonStatus<?>> extends 
StatusRecorder<STATUS> {
+public class TestingStatusRecorder<
+                CR extends AbstractFlinkResource<?, STATUS>, STATUS extends 
CommonStatus<?>>
+        extends StatusRecorder<CR, STATUS> {
 
     public TestingStatusRecorder() {
-        super(null, TestUtils.createTestMetricManager(new Configuration()), 
(r, s) -> {});
+        super(null, new MetricManager<>(), (r, s) -> {});
     }
 
     @Override
-    public <T extends AbstractFlinkResource<?, STATUS>> void 
patchAndCacheStatus(T resource) {
+    public void patchAndCacheStatus(CR resource) {
         statusCache.put(
                 getKey(resource),
                 objectMapper.convertValue(resource.getStatus(), 
ObjectNode.class));
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 088d46bf..b2991479 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.kubernetes.operator.controller;
 
-import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import 
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
@@ -73,10 +73,7 @@ public class TestingFlinkDeploymentController
 
         eventRecorder = new EventRecorder(kubernetesClient, eventCollector);
         statusRecorder =
-                new StatusRecorder<>(
-                        kubernetesClient,
-                        
TestUtils.createTestMetricManager(configManager.getDefaultConfig()),
-                        statusUpdateCounter);
+                new StatusRecorder<>(kubernetesClient, new MetricManager<>(), 
statusUpdateCounter);
         flinkDeploymentController =
                 new FlinkDeploymentController(
                         configManager,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
index e68c910f..c3a42c8c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
@@ -17,11 +17,11 @@
 
 package org.apache.flink.kubernetes.operator.listener;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
@@ -52,11 +52,8 @@ public class FlinkResourceListenerTest {
         var listener2 = new TestingListener();
         var listeners = List.<FlinkResourceListener>of(listener1, listener2);
 
-        var statusRecorder =
-                StatusRecorder.<FlinkDeploymentStatus>create(
-                        kubernetesClient,
-                        TestUtils.createTestMetricManager(new Configuration()),
-                        listeners);
+        StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder =
+                StatusRecorder.create(kubernetesClient, new MetricManager<>(), 
listeners);
         var eventRecorder = EventRecorder.create(kubernetesClient, listeners);
 
         var deployment = TestUtils.buildApplicationCluster();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
index 70a5523e..3c368fef 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
@@ -19,94 +19,161 @@ package org.apache.flink.kubernetes.operator.metrics;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.HashMap;
-
 import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.COUNTER_NAME;
-import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.FLINK_DEPLOYMENT_GROUP_NAME;
-import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.JM_DEPLOYMENT_STATUS_GROUP_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.DEPLOYMENT_GROUP_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATUS_GROUP_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** @link FlinkDeploymentMetrics tests. */
-@EnableKubernetesMockClient(crud = true)
 public class FlinkDeploymentMetricsTest {
-    private KubernetesClient kubernetesClient;
+
+    private final Configuration configuration = new Configuration();
+    private TestingMetricListener listener;
+    private MetricManager<FlinkDeployment> metricManager;
+
+    @BeforeEach
+    public void init() {
+        listener = new TestingMetricListener(configuration);
+        metricManager =
+                MetricManager.createFlinkDeploymentMetricManager(
+                        new FlinkConfigManager(configuration), 
listener.getMetricGroup());
+    }
 
     @Test
-    public void testFlinkDeploymentMetrics() throws InterruptedException {
-        var metrics = new HashMap<String, Metric>();
-        TestingMetricRegistry registry =
-                TestingMetricRegistry.builder()
-                        .setDelimiter(".".charAt(0))
-                        .setRegisterConsumer(
-                                (metric, name, group) -> {
-                                    
metrics.put(group.getMetricIdentifier(name), metric);
-                                })
-                        .build();
+    public void testMetricsSameNamespace() {
+        var namespace = TestUtils.TEST_NAMESPACE;
+        var deployment1 = TestUtils.buildApplicationCluster("deployment1", 
namespace);
+        var deployment2 = TestUtils.buildApplicationCluster("deployment2", 
namespace);
+
+        var counterId =
+                listener.getNamespaceMetricId(namespace, 
DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+        assertTrue(listener.getGauge(counterId).isEmpty());
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            var statusId =
+                    listener.getNamespaceMetricId(
+                            namespace, STATUS_GROUP_NAME, status.name(), 
COUNTER_NAME);
+            assertTrue(listener.getGauge(statusId).isEmpty());
+        }
 
-        var metricManager =
-                (MetricManager) TestUtils.createTestMetricManager(registry, 
new Configuration());
-        var helper =
-                new StatusRecorder<FlinkDeploymentStatus>(
-                        kubernetesClient, metricManager, (e, s) -> {});
+        metricManager.onUpdate(deployment1);
+        metricManager.onUpdate(deployment2);
+        assertEquals(2, listener.getGauge(counterId).get().getValue());
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            deployment1.getStatus().setJobManagerDeploymentStatus(status);
+            deployment2.getStatus().setJobManagerDeploymentStatus(status);
+            metricManager.onUpdate(deployment1);
+            metricManager.onUpdate(deployment2);
+
+            var statusId =
+                    listener.getNamespaceMetricId(
+                            namespace, STATUS_GROUP_NAME, status.name(), 
COUNTER_NAME);
+            assertEquals(2, listener.getGauge(statusId).get().getValue());
+        }
 
-        var deployment = TestUtils.buildApplicationCluster();
-        kubernetesClient.resource(deployment).createOrReplace();
+        metricManager.onRemove(deployment1);
+        metricManager.onRemove(deployment2);
+        assertEquals(0, listener.getGauge(counterId).get().getValue());
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            var statusId =
+                    listener.getNamespaceMetricId(
+                            namespace, STATUS_GROUP_NAME, status.name(), 
COUNTER_NAME);
+            assertEquals(0, listener.getGauge(statusId).get().getValue());
+        }
+    }
 
-        helper.updateStatusFromCache(deployment);
-        assertEquals(1, ((Gauge) 
metrics.get(totalIdentifier(deployment))).getValue());
-        assertEquals(1, ((Gauge) 
metrics.get(perStatusIdentifier(deployment))).getValue());
+    @Test
+    public void testMetricsMultiNamespace() {
+        var namespace1 = "ns1";
+        var namespace2 = "ns2";
+        var deployment1 = TestUtils.buildApplicationCluster("deployment", 
namespace1);
+        var deployment2 = TestUtils.buildApplicationCluster("deployment", 
namespace2);
+
+        var counterId1 =
+                listener.getNamespaceMetricId(namespace1, 
DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+        var counterId2 =
+                listener.getNamespaceMetricId(namespace2, 
DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+        assertTrue(listener.getGauge(counterId1).isEmpty());
+        assertTrue(listener.getGauge(counterId2).isEmpty());
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            var statusId1 =
+                    listener.getNamespaceMetricId(
+                            namespace1, STATUS_GROUP_NAME, status.name(), 
COUNTER_NAME);
+            var statusId2 =
+                    listener.getNamespaceMetricId(
+                            namespace2, STATUS_GROUP_NAME, status.name(), 
COUNTER_NAME);
+            assertTrue(listener.getGauge(statusId1).isEmpty());
+            assertTrue(listener.getGauge(statusId2).isEmpty());
+        }
 
+        metricManager.onUpdate(deployment1);
+        metricManager.onUpdate(deployment2);
+        assertEquals(1, listener.getGauge(counterId1).get().getValue());
+        assertEquals(1, listener.getGauge(counterId2).get().getValue());
         for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
-            deployment.getStatus().setJobManagerDeploymentStatus(status);
-            helper.patchAndCacheStatus(deployment);
-            assertEquals(1, ((Gauge) 
metrics.get(totalIdentifier(deployment))).getValue());
-            assertEquals(1, ((Gauge) 
metrics.get(perStatusIdentifier(deployment))).getValue());
+            deployment1.getStatus().setJobManagerDeploymentStatus(status);
+            deployment2.getStatus().setJobManagerDeploymentStatus(status);
+            metricManager.onUpdate(deployment1);
+            metricManager.onUpdate(deployment2);
+            var statusId1 =
+                    listener.getNamespaceMetricId(
+                            namespace1, STATUS_GROUP_NAME, status.name(), 
COUNTER_NAME);
+            var statusId2 =
+                    listener.getNamespaceMetricId(
+                            namespace2, STATUS_GROUP_NAME, status.name(), 
COUNTER_NAME);
+            assertEquals(1, listener.getGauge(statusId1).get().getValue());
+            assertEquals(1, listener.getGauge(statusId2).get().getValue());
         }
 
-        helper.removeCachedStatus(deployment);
-        assertEquals(0, ((Gauge) 
metrics.get(totalIdentifier(deployment))).getValue());
+        metricManager.onRemove(deployment1);
+        metricManager.onRemove(deployment2);
+
+        assertEquals(0, listener.getGauge(counterId1).get().getValue());
+        assertEquals(0, listener.getGauge(counterId2).get().getValue());
         for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
-            assertEquals(0, ((Gauge) 
metrics.get(perStatusIdentifier(deployment))).getValue());
+            deployment1.getStatus().setJobManagerDeploymentStatus(status);
+            deployment2.getStatus().setJobManagerDeploymentStatus(status);
+            var statusId1 =
+                    listener.getNamespaceMetricId(
+                            namespace1, STATUS_GROUP_NAME, status.name(), 
COUNTER_NAME);
+            var statusId2 =
+                    listener.getNamespaceMetricId(
+                            namespace2, STATUS_GROUP_NAME, status.name(), 
COUNTER_NAME);
+            assertEquals(0, listener.getGauge(statusId1).get().getValue());
+            assertEquals(0, listener.getGauge(statusId2).get().getValue());
         }
     }
 
-    private String totalIdentifier(FlinkDeployment deployment) {
-        String baseScope = 
"testhost.k8soperator.flink-operator-test.testopname.";
-        String[] metricScope =
-                new String[] {
-                    "namespace",
-                    deployment.getMetadata().getNamespace(),
-                    FLINK_DEPLOYMENT_GROUP_NAME,
-                    COUNTER_NAME
-                };
-        return baseScope + String.join(".", metricScope);
-    }
+    @Test
+    public void testMetricsDisabled() {
 
-    private String perStatusIdentifier(FlinkDeployment deployment) {
+        var conf = new Configuration();
+        conf.set(OPERATOR_RESOURCE_METRICS_ENABLED, false);
+        var listener = new TestingMetricListener(conf);
+        var metricManager =
+                MetricManager.createFlinkDeploymentMetricManager(
+                        new FlinkConfigManager(conf), 
listener.getMetricGroup());
 
-        String baseScope = 
"testhost.k8soperator.flink-operator-test.testopname.";
-        String[] metricScope =
-                new String[] {
-                    "namespace",
-                    deployment.getMetadata().getNamespace(),
-                    FLINK_DEPLOYMENT_GROUP_NAME,
-                    JM_DEPLOYMENT_STATUS_GROUP_NAME,
-                    
deployment.getStatus().getJobManagerDeploymentStatus().name(),
-                    COUNTER_NAME
-                };
+        var namespace = TestUtils.TEST_NAMESPACE;
+        var deployment = TestUtils.buildApplicationCluster("deployment", 
namespace);
 
-        return baseScope + String.join(".", metricScope);
+        var counterId =
+                listener.getNamespaceMetricId(namespace, 
DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+        metricManager.onUpdate(deployment);
+        assertTrue(listener.getGauge(counterId).isEmpty());
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            var statusId =
+                    listener.getNamespaceMetricId(
+                            namespace, STATUS_GROUP_NAME, status.name(), 
COUNTER_NAME);
+            assertTrue(listener.getGauge(statusId).isEmpty());
+        }
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
index 21f9a6ba..284d6f42 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
@@ -17,30 +17,117 @@
 
 package org.apache.flink.kubernetes.operator.metrics;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
-import org.apache.flink.metrics.testutils.MetricListener;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkSessionJobMetrics.COUNTER_NAME;
 import static 
org.apache.flink.kubernetes.operator.metrics.FlinkSessionJobMetrics.METRIC_GROUP_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** @link FlinkSessionJobMetrics tests. */
 public class FlinkSessionJobMetricsTest {
 
+    private final Configuration configuration = new Configuration();
+    private TestingMetricListener listener;
+    private MetricManager<FlinkSessionJob> metricManager;
+
+    @BeforeEach
+    public void init() {
+        listener = new TestingMetricListener(configuration);
+        metricManager =
+                MetricManager.createFlinkSessionJobMetricManager(
+                        new FlinkConfigManager(configuration), 
listener.getMetricGroup());
+    }
+
+    @Test
+    public void testMetricsSameNamespace() {
+        var namespace = "test-ns";
+        var job1 = TestUtils.buildSessionJob("job1", namespace);
+        var job2 = TestUtils.buildSessionJob("job2", namespace);
+        var metricId = listener.getNamespaceMetricId(namespace, 
METRIC_GROUP_NAME, COUNTER_NAME);
+        assertTrue(listener.getGauge(namespace).isEmpty());
+
+        metricManager.onUpdate(job1);
+        metricManager.onUpdate(job2);
+        assertEquals(2, listener.getGauge(metricId).get().getValue());
+
+        metricManager.onUpdate(job1);
+        metricManager.onUpdate(job2);
+        assertEquals(2, listener.getGauge(metricId).get().getValue());
+
+        metricManager.onRemove(job1);
+        metricManager.onRemove(job2);
+        assertEquals(0, listener.getGauge(metricId).get().getValue());
+
+        metricManager.onRemove(job1);
+        metricManager.onRemove(job2);
+        assertEquals(0, listener.getGauge(metricId).get().getValue());
+    }
+
+    @Test
+    public void testMetricsMultiNamespace() {
+        var namespace1 = "ns1";
+        var namespace2 = "ns2";
+        var job1 = TestUtils.buildSessionJob("job", namespace1);
+        var job2 = TestUtils.buildSessionJob("job", namespace2);
+
+        var metricId1 = listener.getNamespaceMetricId(namespace1, 
METRIC_GROUP_NAME, COUNTER_NAME);
+        var metricId2 = listener.getNamespaceMetricId(namespace2, 
METRIC_GROUP_NAME, COUNTER_NAME);
+
+        assertTrue(listener.getGauge(metricId1).isEmpty());
+        assertTrue(listener.getGauge(metricId2).isEmpty());
+
+        metricManager.onUpdate(job1);
+        metricManager.onUpdate(job2);
+        assertEquals(1, listener.getGauge(metricId1).get().getValue());
+        assertEquals(1, listener.getGauge(metricId2).get().getValue());
+
+        metricManager.onUpdate(job1);
+        metricManager.onUpdate(job2);
+        assertEquals(1, listener.getGauge(metricId1).get().getValue());
+        assertEquals(1, listener.getGauge(metricId2).get().getValue());
+
+        metricManager.onRemove(job1);
+        metricManager.onRemove(job2);
+        assertEquals(0, listener.getGauge(metricId1).get().getValue());
+        assertEquals(0, listener.getGauge(metricId2).get().getValue());
+
+        metricManager.onRemove(job1);
+        metricManager.onRemove(job2);
+        assertEquals(0, listener.getGauge(metricId1).get().getValue());
+        assertEquals(0, listener.getGauge(metricId2).get().getValue());
+    }
+
     @Test
-    public void testMetrics() {
-        MetricListener metricListener = new MetricListener();
-        FlinkSessionJobMetrics metrics =
-                new FlinkSessionJobMetrics(metricListener.getMetricGroup());
-        assertTrue(metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").isPresent());
-        assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").get().getValue());
-        FlinkSessionJob flinkSessionJob = TestUtils.buildSessionJob();
-        metrics.onUpdate(flinkSessionJob);
-        assertEquals(1, metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").get().getValue());
-        metrics.onRemove(flinkSessionJob);
-        assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").get().getValue());
+    public void testMetricsDisabled() {
+        var flinkSessionJob = TestUtils.buildSessionJob();
+
+        var conf = new Configuration();
+        conf.set(OPERATOR_RESOURCE_METRICS_ENABLED, false);
+        var listener = new TestingMetricListener(conf);
+        var metricManager =
+                MetricManager.createFlinkSessionJobMetricManager(
+                        new FlinkConfigManager(conf), 
listener.getMetricGroup());
+
+        var metricId =
+                listener.getNamespaceMetricId(
+                        flinkSessionJob.getMetadata().getNamespace(),
+                        METRIC_GROUP_NAME,
+                        COUNTER_NAME);
+
+        assertTrue(listener.getGauge(metricId).isEmpty());
+
+        metricManager.onUpdate(flinkSessionJob);
+        assertTrue(listener.getGauge(metricId).isEmpty());
+
+        metricManager.onRemove(flinkSessionJob);
+        assertTrue(listener.getGauge(metricId).isEmpty());
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java
new file mode 100644
index 00000000..294822d2
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Utility class for metrics testing. */
+public class TestingMetricListener {
+
+    public static final String DELIMITER = ".";
+    private final KubernetesOperatorMetricGroup metricGroup;
+    private final Map<String, Metric> metrics = new HashMap();
+    private static final String NAMESPACE = "test-op-ns";
+    private static final String NAME = "test-op-name";
+    private static final String HOST = "test-op-host";
+    private Configuration configuration;
+
+    public TestingMetricListener(Configuration configuration) {
+        TestingMetricRegistry registry =
+                TestingMetricRegistry.builder()
+                        .setDelimiter(DELIMITER.charAt(0))
+                        .setRegisterConsumer(
+                                (metric, name, group) -> {
+                                    
this.metrics.put(group.getMetricIdentifier(name), metric);
+                                })
+                        .build();
+        this.metricGroup =
+                KubernetesOperatorMetricGroup.create(
+                        registry, configuration, NAMESPACE, NAME, HOST);
+        this.configuration = configuration;
+    }
+
+    public KubernetesOperatorMetricGroup getMetricGroup() {
+        return this.metricGroup;
+    }
+
+    public Optional<Counter> getCounter(String identifier) {
+        return this.getMetric(identifier);
+    }
+
+    public Optional<Histogram> getHistogram(String identifier) {
+        return this.getMetric(identifier);
+    }
+
+    public Optional<Meter> getMeter(String identifier) {
+        return this.getMetric(identifier);
+    }
+
+    public <T> Optional<Gauge<T>> getGauge(String identifier) {
+        return Optional.ofNullable((Gauge<T>) this.metrics.get(identifier));
+    }
+
+    private <T extends Metric> Optional<T> getMetric(String identifier) {
+        return Optional.ofNullable((T) this.metrics.get(identifier));
+    }
+
+    public String getMetricId(String... identifiers) {
+        return metricGroup.getMetricIdentifier(String.join(DELIMITER, 
identifiers));
+    }
+
+    public String getNamespaceMetricId(String resourceNs, String... 
identifiers) {
+        return metricGroup
+                .createResourceNamespaceGroup(configuration, resourceNs)
+                .getMetricIdentifier(String.join(DELIMITER, identifiers));
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
index 2e76430e..3240dc6c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
@@ -21,10 +21,14 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
 import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.metrics.Histogram;
@@ -151,8 +155,10 @@ public class ResourceLifecycleMetricsTest {
         dep3.getMetadata().setName("n3");
 
         var conf = new Configuration();
-        var metricManager = TestUtils.createTestMetricManager(conf);
-        var lifeCycleMetrics = metricManager.getLifeCycleMetrics();
+        var metricManager =
+                MetricManager.createFlinkDeploymentMetricManager(
+                        new FlinkConfigManager(conf), 
TestUtils.createTestMetricGroup(conf));
+        var lifeCycleMetrics = getLifeCycleMetrics(metricManager);
 
         metricManager.onUpdate(dep1);
         metricManager.onUpdate(dep2);
@@ -184,8 +190,10 @@ public class ResourceLifecycleMetricsTest {
         conf.set(
                 
KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_NAMESPACE_HISTOGRAMS_ENABLED,
                 false);
-        metricManager = TestUtils.createTestMetricManager(conf);
-        lifeCycleMetrics = metricManager.getLifeCycleMetrics();
+        metricManager =
+                MetricManager.createFlinkDeploymentMetricManager(
+                        new FlinkConfigManager(conf), 
TestUtils.createTestMetricGroup(conf));
+        lifeCycleMetrics = getLifeCycleMetrics(metricManager);
 
         metricManager.onUpdate(dep1);
         metricManager.onUpdate(dep2);
@@ -200,27 +208,34 @@ public class ResourceLifecycleMetricsTest {
                 trackers.get(Tuple2.of("ns2", "n3")).getTransitionHistos());
         trackers.get(Tuple2.of("ns1", "n1"))
                 .getStateTimeHistos()
-                .forEach(
-                        (k, l) -> {
-                            assertEquals(1, l.size());
-                        });
+                .forEach((k, l) -> assertEquals(1, l.size()));
 
         trackers.get(Tuple2.of("ns1", "n1"))
                 .getTransitionHistos()
-                .forEach(
-                        (k, l) -> {
-                            assertEquals(1, l.size());
-                        });
+                .forEach((k, l) -> assertEquals(1, l.size()));
 
         
conf.set(KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED, 
false);
-        metricManager = TestUtils.createTestMetricManager(conf);
-        assertNull(metricManager.getLifeCycleMetrics());
+        metricManager =
+                MetricManager.createFlinkDeploymentMetricManager(
+                        new FlinkConfigManager(conf), 
TestUtils.createTestMetricGroup(conf));
+        assertNull(getLifeCycleMetrics(metricManager));
 
         metricManager.onUpdate(dep1);
         metricManager.onUpdate(dep2);
         metricManager.onUpdate(dep3);
     }
 
+    public static LifecycleMetrics<FlinkDeployment> getLifeCycleMetrics(
+            MetricManager<FlinkDeployment> metricManager) {
+        for (CustomResourceMetrics<FlinkDeployment> metrics :
+                metricManager.getRegisteredMetrics()) {
+            if (metrics instanceof LifecycleMetrics) {
+                return (LifecycleMetrics<FlinkDeployment>) metrics;
+            }
+        }
+        return null;
+    }
+
     private void validateTransition(
             Map<String, List<Histogram>> histos, String name, int size, long 
mean) {
         histos.get(name)
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index 5dddb7b9..4b00ed4f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
@@ -59,7 +60,7 @@ public class SessionJobObserverTest {
     public void before() {
         
kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
         var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
-        var statusRecorder = new 
TestingStatusRecorder<FlinkSessionJobStatus>();
+        var statusRecorder = new TestingStatusRecorder<FlinkSessionJob, 
FlinkSessionJobStatus>();
         flinkService = new TestingFlinkService();
         FlinkServiceFactory flinkServiceFactory = new 
TestingFlinkServiceFactory(flinkService);
         observer =
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 2bb1f089..89f794e6 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -77,7 +77,7 @@ public class ApplicationReconcilerTest {
     public void before() {
         
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
         var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
-        var statusRecoder = new TestingStatusRecorder<FlinkDeploymentStatus>();
+        var statusRecoder = new TestingStatusRecorder<FlinkDeployment, 
FlinkDeploymentStatus>();
         reconciler =
                 new ApplicationReconciler(
                         kubernetesClient,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index d6f342c9..6fc4764e 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -67,7 +67,7 @@ public class SessionJobReconcilerTest {
     private TestingFlinkService flinkService = new TestingFlinkService();
     private EventRecorder eventRecorder;
     private SessionJobReconciler reconciler;
-    private StatusRecorder<FlinkSessionJobStatus> statusRecoder;
+    private StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> 
statusRecoder;
 
     @BeforeEach
     public void before() {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
index e550f201..4fd4e4ac 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
@@ -17,10 +17,11 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -39,10 +40,8 @@ public class StatusRecorderTest {
     @Test
     public void testPatchOnlyWhenChanged() throws InterruptedException {
         var helper =
-                new StatusRecorder<FlinkDeploymentStatus>(
-                        kubernetesClient,
-                        TestUtils.createTestMetricManager(new Configuration()),
-                        (e, s) -> {});
+                new StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>(
+                        kubernetesClient, new MetricManager<>(), (e, s) -> {});
         var deployment = TestUtils.buildApplicationCluster();
         kubernetesClient.resource(deployment).createOrReplace();
         var lastRequest = mockServer.getLastRequest();

Reply via email to