This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d19d8b  [FLINK-28472] Add JmStatus as a group for 
JobManagerDeploymentStatus count metrics
4d19d8b is described below

commit 4d19d8bfcd5ae92e0314cef63efce73d082db969
Author: Matyas Orhidi <[email protected]>
AuthorDate: Mon Jul 11 14:32:02 2022 +0200

    [FLINK-28472] Add JmStatus as a group for JobManagerDeploymentStatus count 
metrics
---
 docs/content/docs/operations/metrics-logging.md    |  10 +-
 .../operator/metrics/FlinkDeploymentMetrics.java   |  16 ++-
 .../metrics/FlinkDeploymentMetricsTest.java        | 111 +++++++++++++--------
 .../operator/utils/StatusRecorderTest.java         |  73 --------------
 4 files changed, 86 insertions(+), 124 deletions(-)

diff --git a/docs/content/docs/operations/metrics-logging.md 
b/docs/content/docs/operations/metrics-logging.md
index 13aa01d..f6a7a9a 100644
--- a/docs/content/docs/operations/metrics-logging.md
+++ b/docs/content/docs/operations/metrics-logging.md
@@ -31,11 +31,11 @@ The Flink Kubernetes Operator (Operator) extends the [Flink 
Metric System](https
 ## Deployment Metrics
 The Operator gathers aggregates metrics about managed resources.
 
-| Scope     | Metrics                        | Description                     
                                                                                
                                            | Type  |
-|-----------|--------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|-------|
-| Namespace | FlinkDeployment.Count          | Number of managed 
FlinkDeployment instances per namespace                                         
                                                          | Gauge |
-| Namespace | FlinkDeployment.<Status>.Count | Number of managed 
FlinkDeployment resources per <Status> per namespace. <Status> can take values 
from: READY, DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR | Gauge |
-| Namespace | FlinkSessionJob.Count          | Number of managed 
FlinkSessionJob instances per namespace                                         
                                                          | Gauge |
+| Scope     | Metrics                                           | Description  
                                                                                
                                                               | Type  |
+|-----------|---------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|-------|
+| Namespace | FlinkDeployment.Count                             | Number of 
managed FlinkDeployment instances per namespace                                 
                                                                  | Gauge |
+| Namespace | FlinkDeployment.JmDeploymentStatus.<Status>.Count | Number of 
managed FlinkDeployment resources per <Status> per namespace. <Status> can take 
values from: READY, DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR | Gauge |
+| Namespace | FlinkSessionJob.Count                             | Number of 
managed FlinkSessionJob instances per namespace                                 
                                                                  | Gauge |
 
 ## System Metrics
 The Operator gathers metrics about the JVM process and exposes it similarly to 
core Flink [System 
metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#system-metrics).
 The list of metrics are not repeated in this document.
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
index 3ccc63c..8e40175 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
@@ -31,20 +31,26 @@ import java.util.concurrent.ConcurrentHashMap;
 public class FlinkDeploymentMetrics implements 
CustomResourceMetrics<FlinkDeployment> {
 
     private final Map<JobManagerDeploymentStatus, Set<String>> statuses = new 
HashMap<>();
-    public static final String METRIC_GROUP_NAME = "FlinkDeployment";
+    public static final String FLINK_DEPLOYMENT_GROUP_NAME = "FlinkDeployment";
+    public static final String JM_DEPLOYMENT_STATUS_GROUP_NAME = 
"JmDeploymentStatus";
+    public static final String COUNTER_NAME = "Count";
 
     public FlinkDeploymentMetrics(MetricGroup parentMetricGroup) {
-        MetricGroup flinkDeploymentMetrics = 
parentMetricGroup.addGroup(METRIC_GROUP_NAME);
+        MetricGroup flinkDeploymentMetrics =
+                parentMetricGroup.addGroup(FLINK_DEPLOYMENT_GROUP_NAME);
         for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
             statuses.put(status, ConcurrentHashMap.newKeySet());
         }
         for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
             statuses.put(status, new HashSet<>());
-            MetricGroup metricGroup = 
flinkDeploymentMetrics.addGroup(status.toString());
-            metricGroup.gauge("Count", () -> statuses.get(status).size());
+            MetricGroup metricGroup =
+                    flinkDeploymentMetrics
+                            .addGroup(JM_DEPLOYMENT_STATUS_GROUP_NAME)
+                            .addGroup(status.toString());
+            metricGroup.gauge(COUNTER_NAME, () -> statuses.get(status).size());
         }
         flinkDeploymentMetrics.gauge(
-                "Count", () -> 
statuses.values().stream().mapToInt(Set::size).sum());
+                COUNTER_NAME, () -> 
statuses.values().stream().mapToInt(Set::size).sum());
     }
 
     public void onUpdate(FlinkDeployment flinkApp) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
index db6e433..70a5523 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
@@ -17,67 +17,96 @@
 
 package org.apache.flink.kubernetes.operator.metrics;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.junit.jupiter.api.Test;
 
-import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.METRIC_GROUP_NAME;
+import java.util.HashMap;
+
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.COUNTER_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.FLINK_DEPLOYMENT_GROUP_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.JM_DEPLOYMENT_STATUS_GROUP_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** @link FlinkDeploymentMetrics tests. */
+@EnableKubernetesMockClient(crud = true)
 public class FlinkDeploymentMetricsTest {
+    private KubernetesClient kubernetesClient;
 
     @Test
-    public void testMetrics() {
-        MetricListener metricListener = new MetricListener();
-        FlinkDeploymentMetrics metrics =
-                new FlinkDeploymentMetrics(metricListener.getMetricGroup());
+    public void testFlinkDeploymentMetrics() throws InterruptedException {
+        var metrics = new HashMap<String, Metric>();
+        TestingMetricRegistry registry =
+                TestingMetricRegistry.builder()
+                        .setDelimiter(".".charAt(0))
+                        .setRegisterConsumer(
+                                (metric, name, group) -> {
+                                    
metrics.put(group.getMetricIdentifier(name), metric);
+                                })
+                        .build();
 
-        assertTrue(metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").isPresent());
-        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
-            assertTrue(
-                    metricListener
-                            .getGauge(METRIC_GROUP_NAME, status.toString(), 
"Count")
-                            .isPresent());
-        }
+        var metricManager =
+                (MetricManager) TestUtils.createTestMetricManager(registry, 
new Configuration());
+        var helper =
+                new StatusRecorder<FlinkDeploymentStatus>(
+                        kubernetesClient, metricManager, (e, s) -> {});
 
-        assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").get().getValue());
-        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
-            assertEquals(
-                    0,
-                    metricListener
-                            .getGauge(METRIC_GROUP_NAME, status.toString(), 
"Count")
-                            .get()
-                            .getValue());
-        }
+        var deployment = TestUtils.buildApplicationCluster();
+        kubernetesClient.resource(deployment).createOrReplace();
 
-        FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
+        helper.updateStatusFromCache(deployment);
+        assertEquals(1, ((Gauge) 
metrics.get(totalIdentifier(deployment))).getValue());
+        assertEquals(1, ((Gauge) 
metrics.get(perStatusIdentifier(deployment))).getValue());
 
         for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
-            flinkDeployment.getStatus().setJobManagerDeploymentStatus(status);
-            metrics.onUpdate(flinkDeployment);
-            assertEquals(
-                    1,
-                    metricListener
-                            .getGauge(METRIC_GROUP_NAME, status.toString(), 
"Count")
-                            .get()
-                            .getValue());
-            assertEquals(1, metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").get().getValue());
+            deployment.getStatus().setJobManagerDeploymentStatus(status);
+            helper.patchAndCacheStatus(deployment);
+            assertEquals(1, ((Gauge) 
metrics.get(totalIdentifier(deployment))).getValue());
+            assertEquals(1, ((Gauge) 
metrics.get(perStatusIdentifier(deployment))).getValue());
         }
 
-        metrics.onRemove(flinkDeployment);
-        assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME, 
"Count").get().getValue());
+        helper.removeCachedStatus(deployment);
+        assertEquals(0, ((Gauge) 
metrics.get(totalIdentifier(deployment))).getValue());
         for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
-            assertEquals(
-                    0,
-                    metricListener
-                            .getGauge(METRIC_GROUP_NAME, status.toString(), 
"Count")
-                            .get()
-                            .getValue());
+            assertEquals(0, ((Gauge) 
metrics.get(perStatusIdentifier(deployment))).getValue());
         }
     }
+
+    private String totalIdentifier(FlinkDeployment deployment) {
+        String baseScope = 
"testhost.k8soperator.flink-operator-test.testopname.";
+        String[] metricScope =
+                new String[] {
+                    "namespace",
+                    deployment.getMetadata().getNamespace(),
+                    FLINK_DEPLOYMENT_GROUP_NAME,
+                    COUNTER_NAME
+                };
+        return baseScope + String.join(".", metricScope);
+    }
+
+    private String perStatusIdentifier(FlinkDeployment deployment) {
+
+        String baseScope = 
"testhost.k8soperator.flink-operator-test.testopname.";
+        String[] metricScope =
+                new String[] {
+                    "namespace",
+                    deployment.getMetadata().getNamespace(),
+                    FLINK_DEPLOYMENT_GROUP_NAME,
+                    JM_DEPLOYMENT_STATUS_GROUP_NAME,
+                    
deployment.getStatus().getJobManagerDeploymentStatus().name(),
+                    COUNTER_NAME
+                };
+
+        return baseScope + String.join(".", metricScope);
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
index 52a2f45..e550f20 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
@@ -19,24 +19,14 @@ package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import org.junit.jupiter.api.Test;
 
-import java.util.HashMap;
-
-import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.METRIC_GROUP_NAME;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for {@link StatusRecorder}. */
@@ -71,67 +61,4 @@ public class StatusRecorderTest {
         helper.patchAndCacheStatus(deployment);
         assertTrue(mockServer.getLastRequest() == lastRequest);
     }
-
-    @Test
-    public void testFlinkDeploymentMetrics() throws InterruptedException {
-        var metrics = new HashMap<String, Metric>();
-        TestingMetricRegistry registry =
-                TestingMetricRegistry.builder()
-                        .setDelimiter(".".charAt(0))
-                        .setRegisterConsumer(
-                                (metric, name, group) -> {
-                                    
metrics.put(group.getMetricIdentifier(name), metric);
-                                })
-                        .build();
-
-        var metricManager =
-                (MetricManager) TestUtils.createTestMetricManager(registry, 
new Configuration());
-        var helper =
-                new StatusRecorder<FlinkDeploymentStatus>(
-                        kubernetesClient, metricManager, (e, s) -> {});
-
-        var deployment = TestUtils.buildApplicationCluster();
-        kubernetesClient.resource(deployment).createOrReplace();
-
-        helper.updateStatusFromCache(deployment);
-        assertEquals(1, ((Gauge) 
metrics.get(totalIdentifier(deployment))).getValue());
-        assertEquals(1, ((Gauge) 
metrics.get(perStatusIdentifier(deployment))).getValue());
-
-        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
-            deployment.getStatus().setJobManagerDeploymentStatus(status);
-            helper.patchAndCacheStatus(deployment);
-            assertEquals(1, ((Gauge) 
metrics.get(totalIdentifier(deployment))).getValue());
-            assertEquals(1, ((Gauge) 
metrics.get(perStatusIdentifier(deployment))).getValue());
-        }
-
-        helper.removeCachedStatus(deployment);
-        assertEquals(0, ((Gauge) 
metrics.get(totalIdentifier(deployment))).getValue());
-        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
-            assertEquals(0, ((Gauge) 
metrics.get(perStatusIdentifier(deployment))).getValue());
-        }
-    }
-
-    private String totalIdentifier(FlinkDeployment deployment) {
-        String baseScope = 
"testhost.k8soperator.flink-operator-test.testopname.";
-        String[] metricScope =
-                new String[] {
-                    "namespace", deployment.getMetadata().getNamespace(), 
METRIC_GROUP_NAME, "Count"
-                };
-        return baseScope + String.join(".", metricScope);
-    }
-
-    private String perStatusIdentifier(FlinkDeployment deployment) {
-
-        String baseScope = 
"testhost.k8soperator.flink-operator-test.testopname.";
-        String[] metricScope =
-                new String[] {
-                    "namespace",
-                    deployment.getMetadata().getNamespace(),
-                    METRIC_GROUP_NAME,
-                    
deployment.getStatus().getJobManagerDeploymentStatus().name(),
-                    "Count"
-                };
-
-        return baseScope + String.join(".", metricScope);
-    }
 }

Reply via email to