This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new a28cb33  [FLINK-26953] Introduce Operator Specific Metrics
a28cb33 is described below

commit a28cb33617c7027b30722cd4b0881e0558aa7202
Author: Matyas Orhidi <[email protected]>
AuthorDate: Tue May 3 12:59:46 2022 +0200

    [FLINK-26953] Introduce Operator Specific Metrics
---
 flink-kubernetes-operator/pom.xml                  |  7 ++
 .../flink/kubernetes/operator/FlinkOperator.java   | 20 +++++-
 .../controller/FlinkDeploymentController.java      | 11 ++-
 .../controller/FlinkSessionJobController.java      | 12 +++-
 .../operator/metrics/CustomResourceMetrics.java    | 27 +++++++
 .../operator/metrics/FlinkDeploymentMetrics.java   | 63 ++++++++++++++++
 .../operator/metrics/FlinkSessionJobMetrics.java   | 44 ++++++++++++
 .../kubernetes/operator/metrics/MetricManager.java | 63 ++++++++++++++++
 .../operator/metrics/OperatorMetricUtils.java      |  3 +-
 .../flink/kubernetes/operator/TestUtils.java       |  5 +-
 .../metrics/FlinkDeploymentMetricsTest.java        | 83 ++++++++++++++++++++++
 .../metrics/FlinkSessionJobMetricsTest.java        | 46 ++++++++++++
 12 files changed, 374 insertions(+), 10 deletions(-)

diff --git a/flink-kubernetes-operator/pom.xml 
b/flink-kubernetes-operator/pom.xml
index c7ccfba..8c886ba 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -135,6 +135,13 @@ under the License.
             <version>${fabric8.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_${scala.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 6e3f535..03d5a03 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController
 import 
org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
 import org.apache.flink.kubernetes.operator.observer.Observer;
 import 
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
@@ -40,6 +41,7 @@ import 
org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+import org.apache.flink.metrics.MetricGroup;
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -66,6 +68,7 @@ public class FlinkOperator {
     private final ConfigurationService configurationService;
     private final FlinkConfigManager configManager;
     private final Set<FlinkResourceValidator> validators;
+    private final MetricGroup metricGroup;
 
     public FlinkOperator(@Nullable Configuration conf) {
         this.client = new DefaultKubernetesClient();
@@ -75,7 +78,8 @@ public class FlinkOperator {
         this.operator = new Operator(client, configurationService);
         this.flinkService = new FlinkService(client, configManager);
         this.validators = ValidatorUtils.discoverValidators(configManager);
-        
OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
+        this.metricGroup =
+                
OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
         PluginManager pluginManager =
                 
PluginUtils.createPluginManagerFromRootFolder(configManager.getDefaultConfig());
         FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
@@ -88,7 +92,12 @@ public class FlinkOperator {
 
         FlinkDeploymentController controller =
                 new FlinkDeploymentController(
-                        configManager, client, validators, reconcilerFactory, 
observerFactory);
+                        configManager,
+                        client,
+                        validators,
+                        reconcilerFactory,
+                        observerFactory,
+                        new MetricManager<>(metricGroup));
 
         FlinkControllerConfig<FlinkDeployment> controllerConfig =
                 new FlinkControllerConfig<>(
@@ -105,7 +114,12 @@ public class FlinkOperator {
         Observer<FlinkSessionJob> observer = new 
SessionJobObserver(flinkService, configManager);
         FlinkSessionJobController controller =
                 new FlinkSessionJobController(
-                        configManager, client, validators, reconciler, 
observer);
+                        configManager,
+                        client,
+                        validators,
+                        reconciler,
+                        observer,
+                        new MetricManager<>(metricGroup));
 
         FlinkControllerConfig<FlinkSessionJob> controllerConfig =
                 new FlinkControllerConfig<>(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 7152966..8138d97 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import 
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
@@ -63,7 +64,7 @@ public class FlinkDeploymentController
     private final Set<FlinkResourceValidator> validators;
     private final ReconcilerFactory reconcilerFactory;
     private final ObserverFactory observerFactory;
-
+    private final MetricManager<FlinkDeployment> metricManager;
     private FlinkControllerConfig<FlinkDeployment> controllerConfig;
 
     public FlinkDeploymentController(
@@ -71,12 +72,14 @@ public class FlinkDeploymentController
             KubernetesClient kubernetesClient,
             Set<FlinkResourceValidator> validators,
             ReconcilerFactory reconcilerFactory,
-            ObserverFactory observerFactory) {
+            ObserverFactory observerFactory,
+            MetricManager<FlinkDeployment> metricManager) {
         this.configManager = configManager;
         this.kubernetesClient = kubernetesClient;
         this.validators = validators;
         this.reconcilerFactory = reconcilerFactory;
         this.observerFactory = observerFactory;
+        this.metricManager = metricManager;
     }
 
     @Override
@@ -87,6 +90,7 @@ public class FlinkDeploymentController
         } catch (DeploymentFailedException dfe) {
             // ignore during cleanup
         }
+        metricManager.onRemove(flinkApp);
         return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, 
context);
     }
 
@@ -100,6 +104,7 @@ public class FlinkDeploymentController
             if (validationError.isPresent()) {
                 LOG.error("Validation failed: " + validationError.get());
                 ReconciliationUtils.updateForReconciliationError(flinkApp, 
validationError.get());
+                metricManager.onUpdate(flinkApp);
                 return ReconciliationUtils.toUpdateControl(
                         configManager.getOperatorConfiguration(), 
originalCopy, flinkApp, false);
             }
@@ -111,6 +116,7 @@ public class FlinkDeploymentController
         }
 
         LOG.info("End of reconciliation");
+        metricManager.onUpdate(flinkApp);
         return ReconciliationUtils.toUpdateControl(
                 configManager.getOperatorConfiguration(), originalCopy, 
flinkApp, true);
     }
@@ -153,6 +159,7 @@ public class FlinkDeploymentController
         ReconciliationUtils.updateForReconciliationError(
                 flinkApp,
                 (e instanceof ReconciliationException) ? 
e.getCause().toString() : e.toString());
+        metricManager.onUpdate(flinkApp);
         return Optional.of(flinkApp);
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index f7bc660..fd09fff 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
@@ -73,6 +74,7 @@ public class FlinkSessionJobController
     private final Set<FlinkResourceValidator> validators;
     private final Reconciler<FlinkSessionJob> reconciler;
     private final Observer<FlinkSessionJob> observer;
+    private final MetricManager<FlinkSessionJob> metricManager;
     private Map<String, SharedIndexInformer<FlinkSessionJob>> informers;
     private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
 
@@ -81,12 +83,14 @@ public class FlinkSessionJobController
             KubernetesClient kubernetesClient,
             Set<FlinkResourceValidator> validators,
             Reconciler<FlinkSessionJob> reconciler,
-            Observer<FlinkSessionJob> observer) {
+            Observer<FlinkSessionJob> observer,
+            MetricManager<FlinkSessionJob> metricManager) {
         this.configManager = configManager;
         this.kubernetesClient = kubernetesClient;
         this.validators = validators;
         this.reconciler = reconciler;
         this.observer = observer;
+        this.metricManager = metricManager;
     }
 
     public void init(FlinkControllerConfig<FlinkSessionJob> config) {
@@ -105,6 +109,7 @@ public class FlinkSessionJobController
             LOG.error("Validation failed: " + validationError.get());
             ReconciliationUtils.updateForReconciliationError(
                     flinkSessionJob, validationError.get());
+            metricManager.onUpdate(flinkSessionJob);
             return ReconciliationUtils.toUpdateControl(originalCopy, 
flinkSessionJob);
         }
 
@@ -114,7 +119,7 @@ public class FlinkSessionJobController
         } catch (Exception e) {
             throw new ReconciliationException(e);
         }
-
+        metricManager.onUpdate(flinkSessionJob);
         return ReconciliationUtils.toUpdateControl(originalCopy, 
flinkSessionJob)
                 .rescheduleAfter(
                         
configManager.getOperatorConfiguration().getReconcileInterval().toMillis());
@@ -123,7 +128,7 @@ public class FlinkSessionJobController
     @Override
     public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) {
         LOG.info("Deleting FlinkSessionJob");
-
+        metricManager.onRemove(sessionJob);
         return reconciler.cleanup(sessionJob, context);
     }
 
@@ -138,6 +143,7 @@ public class FlinkSessionJobController
         ReconciliationUtils.updateForReconciliationError(
                 flinkSessionJob,
                 (e instanceof ReconciliationException) ? 
e.getCause().toString() : e.toString());
+        metricManager.onUpdate(flinkSessionJob);
         return Optional.of(flinkSessionJob);
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/CustomResourceMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/CustomResourceMetrics.java
new file mode 100644
index 0000000..512db41
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/CustomResourceMetrics.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import io.fabric8.kubernetes.client.CustomResource;
+
+/** Custom resource metric type. */
+public interface CustomResourceMetrics<CR extends CustomResource> {
+    void onUpdate(CR customResource);
+
+    void onRemove(CR customResource);
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
new file mode 100644
index 0000000..3ccc63c
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** FlinkDeployment metrics. */
+public class FlinkDeploymentMetrics implements 
CustomResourceMetrics<FlinkDeployment> {
+
+    private final Map<JobManagerDeploymentStatus, Set<String>> statuses = new 
HashMap<>();
+    public static final String METRIC_GROUP_NAME = "FlinkDeployment";
+
+    public FlinkDeploymentMetrics(MetricGroup parentMetricGroup) {
+        MetricGroup flinkDeploymentMetrics = 
parentMetricGroup.addGroup(METRIC_GROUP_NAME);
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            statuses.put(status, ConcurrentHashMap.newKeySet());
+        }
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            statuses.put(status, new HashSet<>());
+            MetricGroup metricGroup = 
flinkDeploymentMetrics.addGroup(status.toString());
+            metricGroup.gauge("Count", () -> statuses.get(status).size());
+        }
+        flinkDeploymentMetrics.gauge(
+                "Count", () -> 
statuses.values().stream().mapToInt(Set::size).sum());
+    }
+
+    public void onUpdate(FlinkDeployment flinkApp) {
+        onRemove(flinkApp);
+        statuses.get(flinkApp.getStatus().getJobManagerDeploymentStatus())
+                .add(flinkApp.getMetadata().getName());
+    }
+
+    public void onRemove(FlinkDeployment flinkApp) {
+        statuses.values()
+                .forEach(
+                        deployments -> {
+                            
deployments.remove(flinkApp.getMetadata().getName());
+                        });
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
new file mode 100644
index 0000000..492ed0d
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** FlinkSessionJob metrics. */
+public class FlinkSessionJobMetrics implements 
CustomResourceMetrics<FlinkSessionJob> {
+
+    private final Set<String> sessionJobs = ConcurrentHashMap.newKeySet();
+    public static final String METRIC_GROUP_NAME = "FlinkSessionJob";
+
+    public FlinkSessionJobMetrics(MetricGroup parentMetricGroup) {
+        MetricGroup flinkSessionJobMetrics = 
parentMetricGroup.addGroup(METRIC_GROUP_NAME);
+        flinkSessionJobMetrics.gauge("Count", () -> sessionJobs.size());
+    }
+
+    public void onUpdate(FlinkSessionJob sessionJob) {
+        sessionJobs.add(sessionJob.getMetadata().getName());
+    }
+
+    public void onRemove(FlinkSessionJob sessionJob) {
+        sessionJobs.remove(sessionJob.getMetadata().getName());
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
new file mode 100644
index 0000000..6dbfb3b
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.metrics.MetricGroup;
+
+import io.fabric8.kubernetes.client.CustomResource;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Metric manager for Operator managed custom resources. */
+public class MetricManager<CR extends CustomResource<?, ?>> {
+    private static final String NS_SCOPE_KEY = "resourcens";
+    private final MetricGroup metricGroup;
+    private final Map<String, CustomResourceMetrics> metrics = new 
ConcurrentHashMap<>();
+
+    public MetricManager(MetricGroup metricGroup) {
+        this.metricGroup = metricGroup;
+    }
+
+    public void onUpdate(CR cr) {
+        getCustomResourceMetrics(cr).onUpdate(cr);
+    }
+
+    public void onRemove(CR cr) {
+        getCustomResourceMetrics(cr).onRemove(cr);
+    }
+
+    private CustomResourceMetrics getCustomResourceMetrics(CR cr) {
+        return metrics.computeIfAbsent(
+                cr.getMetadata().getNamespace(), k -> 
getCustomResourceMetricsImpl(cr));
+    }
+
+    private CustomResourceMetrics getCustomResourceMetricsImpl(CR cr) {
+        if (cr instanceof FlinkDeployment) {
+            return new FlinkDeploymentMetrics(
+                    metricGroup.addGroup(NS_SCOPE_KEY, 
cr.getMetadata().getNamespace()));
+        } else if (cr instanceof FlinkSessionJob) {
+            return new FlinkSessionJobMetrics(
+                    metricGroup.addGroup(NS_SCOPE_KEY, 
cr.getMetadata().getNamespace()));
+        } else {
+            throw new IllegalArgumentException("Unknown CustomResource");
+        }
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
index 467776d..0e53caf 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
@@ -43,7 +43,7 @@ public class OperatorMetricUtils {
     private static final String OPERATOR_METRICS_PREFIX = 
"kubernetes.operator.metrics.";
     private static final String METRICS_PREFIX = "metrics.";
 
-    public static void initOperatorMetrics(Configuration defaultConfig) {
+    public static MetricGroup initOperatorMetrics(Configuration defaultConfig) 
{
         Configuration metricConfig = createMetricConfig(defaultConfig);
         LOG.info("Initializing operator metrics using conf: {}", metricConfig);
         PluginManager pluginManager = 
PluginUtils.createPluginManagerFromRootFolder(metricConfig);
@@ -58,6 +58,7 @@ public class OperatorMetricUtils {
                         EnvUtils.getOrDefault(EnvUtils.ENV_HOSTNAME, 
"localhost"));
         MetricGroup statusGroup = operatorMetricGroup.addGroup("Status");
         MetricUtils.instantiateStatusMetrics(statusGroup);
+        return operatorMetricGroup;
     }
 
     @VisibleForTesting
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 811e09f..991e3a7 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -38,9 +38,11 @@ import 
org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import 
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
+import org.apache.flink.metrics.testutils.MetricListener;
 
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ContainerStatus;
@@ -350,7 +352,8 @@ public class TestUtils {
                         kubernetesClient,
                         ValidatorUtils.discoverValidators(configManager),
                         new ReconcilerFactory(kubernetesClient, flinkService, 
configManager),
-                        new ObserverFactory(flinkService, configManager));
+                        new ObserverFactory(flinkService, configManager),
+                        new MetricManager<>(new 
MetricListener().getMetricGroup()));
         controller.setControllerConfig(
                 new FlinkControllerConfig(controller, Collections.emptySet()));
         return controller;
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
new file mode 100644
index 0000000..db6e433
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.METRIC_GROUP_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** @link FlinkDeploymentMetrics tests. */
+public class FlinkDeploymentMetricsTest {
+
+    @Test
+    public void testMetrics() {
+        MetricListener metricListener = new MetricListener();
+        FlinkDeploymentMetrics metrics =
+                new FlinkDeploymentMetrics(metricListener.getMetricGroup());
+
+        assertTrue(metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").isPresent());
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            assertTrue(
+                    metricListener
+                            .getGauge(METRIC_GROUP_NAME, status.toString(), 
"Count")
+                            .isPresent());
+        }
+
+        assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").get().getValue());
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            assertEquals(
+                    0,
+                    metricListener
+                            .getGauge(METRIC_GROUP_NAME, status.toString(), 
"Count")
+                            .get()
+                            .getValue());
+        }
+
+        FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
+
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            flinkDeployment.getStatus().setJobManagerDeploymentStatus(status);
+            metrics.onUpdate(flinkDeployment);
+            assertEquals(
+                    1,
+                    metricListener
+                            .getGauge(METRIC_GROUP_NAME, status.toString(), 
"Count")
+                            .get()
+                            .getValue());
+            assertEquals(1, metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").get().getValue());
+        }
+
+        metrics.onRemove(flinkDeployment);
+        assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").get().getValue());
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            assertEquals(
+                    0,
+                    metricListener
+                            .getGauge(METRIC_GROUP_NAME, status.toString(), 
"Count")
+                            .get()
+                            .getValue());
+        }
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
new file mode 100644
index 0000000..21f9a6b
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkSessionJobMetrics.METRIC_GROUP_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** @link FlinkSessionJobMetrics tests. */
+public class FlinkSessionJobMetricsTest {
+
+    @Test
+    public void testMetrics() {
+        MetricListener metricListener = new MetricListener();
+        FlinkSessionJobMetrics metrics =
+                new FlinkSessionJobMetrics(metricListener.getMetricGroup());
+        assertTrue(metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").isPresent());
+        assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").get().getValue());
+        FlinkSessionJob flinkSessionJob = TestUtils.buildSessionJob();
+        metrics.onUpdate(flinkSessionJob);
+        assertEquals(1, metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").get().getValue());
+        metrics.onRemove(flinkSessionJob);
+        assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").get().getValue());
+    }
+}

Reply via email to