This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 4d19d8b [FLINK-28472] Add JmStatus as a group for
JobManagerDeploymentStatus count metrics
4d19d8b is described below
commit 4d19d8bfcd5ae92e0314cef63efce73d082db969
Author: Matyas Orhidi <[email protected]>
AuthorDate: Mon Jul 11 14:32:02 2022 +0200
[FLINK-28472] Add JmStatus as a group for JobManagerDeploymentStatus count
metrics
---
docs/content/docs/operations/metrics-logging.md | 10 +-
.../operator/metrics/FlinkDeploymentMetrics.java | 16 ++-
.../metrics/FlinkDeploymentMetricsTest.java | 111 +++++++++++++--------
.../operator/utils/StatusRecorderTest.java | 73 --------------
4 files changed, 86 insertions(+), 124 deletions(-)
diff --git a/docs/content/docs/operations/metrics-logging.md
b/docs/content/docs/operations/metrics-logging.md
index 13aa01d..f6a7a9a 100644
--- a/docs/content/docs/operations/metrics-logging.md
+++ b/docs/content/docs/operations/metrics-logging.md
@@ -31,11 +31,11 @@ The Flink Kubernetes Operator (Operator) extends the [Flink
Metric System](https
## Deployment Metrics
The Operator gathers aggregates metrics about managed resources.
-| Scope | Metrics | Description
| Type |
-|-----------|--------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|-------|
-| Namespace | FlinkDeployment.Count | Number of managed
FlinkDeployment instances per namespace
| Gauge |
-| Namespace | FlinkDeployment.<Status>.Count | Number of managed
FlinkDeployment resources per <Status> per namespace. <Status> can take values
from: READY, DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR | Gauge |
-| Namespace | FlinkSessionJob.Count | Number of managed
FlinkSessionJob instances per namespace
| Gauge |
+| Scope | Metrics | Description
| Type |
+|-----------|---------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|-------|
+| Namespace | FlinkDeployment.Count | Number of
managed FlinkDeployment instances per namespace
| Gauge |
+| Namespace | FlinkDeployment.JmDeploymentStatus.<Status>.Count | Number of
managed FlinkDeployment resources per <Status> per namespace. <Status> can take
values from: READY, DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR | Gauge |
+| Namespace | FlinkSessionJob.Count | Number of
managed FlinkSessionJob instances per namespace
| Gauge |
## System Metrics
The Operator gathers metrics about the JVM process and exposes it similarly to
core Flink [System
metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#system-metrics).
The list of metrics are not repeated in this document.
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
index 3ccc63c..8e40175 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
@@ -31,20 +31,26 @@ import java.util.concurrent.ConcurrentHashMap;
public class FlinkDeploymentMetrics implements
CustomResourceMetrics<FlinkDeployment> {
private final Map<JobManagerDeploymentStatus, Set<String>> statuses = new
HashMap<>();
- public static final String METRIC_GROUP_NAME = "FlinkDeployment";
+ public static final String FLINK_DEPLOYMENT_GROUP_NAME = "FlinkDeployment";
+ public static final String JM_DEPLOYMENT_STATUS_GROUP_NAME =
"JmDeploymentStatus";
+ public static final String COUNTER_NAME = "Count";
public FlinkDeploymentMetrics(MetricGroup parentMetricGroup) {
- MetricGroup flinkDeploymentMetrics =
parentMetricGroup.addGroup(METRIC_GROUP_NAME);
+ MetricGroup flinkDeploymentMetrics =
+ parentMetricGroup.addGroup(FLINK_DEPLOYMENT_GROUP_NAME);
for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
statuses.put(status, ConcurrentHashMap.newKeySet());
}
for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
statuses.put(status, new HashSet<>());
- MetricGroup metricGroup =
flinkDeploymentMetrics.addGroup(status.toString());
- metricGroup.gauge("Count", () -> statuses.get(status).size());
+ MetricGroup metricGroup =
+ flinkDeploymentMetrics
+ .addGroup(JM_DEPLOYMENT_STATUS_GROUP_NAME)
+ .addGroup(status.toString());
+ metricGroup.gauge(COUNTER_NAME, () -> statuses.get(status).size());
}
flinkDeploymentMetrics.gauge(
- "Count", () ->
statuses.values().stream().mapToInt(Set::size).sum());
+ COUNTER_NAME, () ->
statuses.values().stream().mapToInt(Set::size).sum());
}
public void onUpdate(FlinkDeployment flinkApp) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
index db6e433..70a5523 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
@@ -17,67 +17,96 @@
package org.apache.flink.kubernetes.operator.metrics;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.junit.jupiter.api.Test;
-import static
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.METRIC_GROUP_NAME;
+import java.util.HashMap;
+
+import static
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.COUNTER_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.FLINK_DEPLOYMENT_GROUP_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.JM_DEPLOYMENT_STATUS_GROUP_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
/** @link FlinkDeploymentMetrics tests. */
+@EnableKubernetesMockClient(crud = true)
public class FlinkDeploymentMetricsTest {
+ private KubernetesClient kubernetesClient;
@Test
- public void testMetrics() {
- MetricListener metricListener = new MetricListener();
- FlinkDeploymentMetrics metrics =
- new FlinkDeploymentMetrics(metricListener.getMetricGroup());
+ public void testFlinkDeploymentMetrics() throws InterruptedException {
+ var metrics = new HashMap<String, Metric>();
+ TestingMetricRegistry registry =
+ TestingMetricRegistry.builder()
+ .setDelimiter(".".charAt(0))
+ .setRegisterConsumer(
+ (metric, name, group) -> {
+
metrics.put(group.getMetricIdentifier(name), metric);
+ })
+ .build();
- assertTrue(metricListener.getGauge(METRIC_GROUP_NAME,
"Count").isPresent());
- for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
- assertTrue(
- metricListener
- .getGauge(METRIC_GROUP_NAME, status.toString(),
"Count")
- .isPresent());
- }
+ var metricManager =
+ (MetricManager) TestUtils.createTestMetricManager(registry,
new Configuration());
+ var helper =
+ new StatusRecorder<FlinkDeploymentStatus>(
+ kubernetesClient, metricManager, (e, s) -> {});
- assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME,
"Count").get().getValue());
- for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
- assertEquals(
- 0,
- metricListener
- .getGauge(METRIC_GROUP_NAME, status.toString(),
"Count")
- .get()
- .getValue());
- }
+ var deployment = TestUtils.buildApplicationCluster();
+ kubernetesClient.resource(deployment).createOrReplace();
- FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
+ helper.updateStatusFromCache(deployment);
+ assertEquals(1, ((Gauge)
metrics.get(totalIdentifier(deployment))).getValue());
+ assertEquals(1, ((Gauge)
metrics.get(perStatusIdentifier(deployment))).getValue());
for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
- flinkDeployment.getStatus().setJobManagerDeploymentStatus(status);
- metrics.onUpdate(flinkDeployment);
- assertEquals(
- 1,
- metricListener
- .getGauge(METRIC_GROUP_NAME, status.toString(),
"Count")
- .get()
- .getValue());
- assertEquals(1, metricListener.getGauge(METRIC_GROUP_NAME,
"Count").get().getValue());
+ deployment.getStatus().setJobManagerDeploymentStatus(status);
+ helper.patchAndCacheStatus(deployment);
+ assertEquals(1, ((Gauge)
metrics.get(totalIdentifier(deployment))).getValue());
+ assertEquals(1, ((Gauge)
metrics.get(perStatusIdentifier(deployment))).getValue());
}
- metrics.onRemove(flinkDeployment);
- assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME,
"Count").get().getValue());
+ helper.removeCachedStatus(deployment);
+ assertEquals(0, ((Gauge)
metrics.get(totalIdentifier(deployment))).getValue());
for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
- assertEquals(
- 0,
- metricListener
- .getGauge(METRIC_GROUP_NAME, status.toString(),
"Count")
- .get()
- .getValue());
+ assertEquals(0, ((Gauge)
metrics.get(perStatusIdentifier(deployment))).getValue());
}
}
+
+ private String totalIdentifier(FlinkDeployment deployment) {
+ String baseScope =
"testhost.k8soperator.flink-operator-test.testopname.";
+ String[] metricScope =
+ new String[] {
+ "namespace",
+ deployment.getMetadata().getNamespace(),
+ FLINK_DEPLOYMENT_GROUP_NAME,
+ COUNTER_NAME
+ };
+ return baseScope + String.join(".", metricScope);
+ }
+
+ private String perStatusIdentifier(FlinkDeployment deployment) {
+
+ String baseScope =
"testhost.k8soperator.flink-operator-test.testopname.";
+ String[] metricScope =
+ new String[] {
+ "namespace",
+ deployment.getMetadata().getNamespace(),
+ FLINK_DEPLOYMENT_GROUP_NAME,
+ JM_DEPLOYMENT_STATUS_GROUP_NAME,
+
deployment.getStatus().getJobManagerDeploymentStatus().name(),
+ COUNTER_NAME
+ };
+
+ return baseScope + String.join(".", metricScope);
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
index 52a2f45..e550f20 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
@@ -19,24 +19,14 @@ package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.junit.jupiter.api.Test;
-import java.util.HashMap;
-
-import static
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.METRIC_GROUP_NAME;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Test for {@link StatusRecorder}. */
@@ -71,67 +61,4 @@ public class StatusRecorderTest {
helper.patchAndCacheStatus(deployment);
assertTrue(mockServer.getLastRequest() == lastRequest);
}
-
- @Test
- public void testFlinkDeploymentMetrics() throws InterruptedException {
- var metrics = new HashMap<String, Metric>();
- TestingMetricRegistry registry =
- TestingMetricRegistry.builder()
- .setDelimiter(".".charAt(0))
- .setRegisterConsumer(
- (metric, name, group) -> {
-
metrics.put(group.getMetricIdentifier(name), metric);
- })
- .build();
-
- var metricManager =
- (MetricManager) TestUtils.createTestMetricManager(registry,
new Configuration());
- var helper =
- new StatusRecorder<FlinkDeploymentStatus>(
- kubernetesClient, metricManager, (e, s) -> {});
-
- var deployment = TestUtils.buildApplicationCluster();
- kubernetesClient.resource(deployment).createOrReplace();
-
- helper.updateStatusFromCache(deployment);
- assertEquals(1, ((Gauge)
metrics.get(totalIdentifier(deployment))).getValue());
- assertEquals(1, ((Gauge)
metrics.get(perStatusIdentifier(deployment))).getValue());
-
- for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
- deployment.getStatus().setJobManagerDeploymentStatus(status);
- helper.patchAndCacheStatus(deployment);
- assertEquals(1, ((Gauge)
metrics.get(totalIdentifier(deployment))).getValue());
- assertEquals(1, ((Gauge)
metrics.get(perStatusIdentifier(deployment))).getValue());
- }
-
- helper.removeCachedStatus(deployment);
- assertEquals(0, ((Gauge)
metrics.get(totalIdentifier(deployment))).getValue());
- for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
- assertEquals(0, ((Gauge)
metrics.get(perStatusIdentifier(deployment))).getValue());
- }
- }
-
- private String totalIdentifier(FlinkDeployment deployment) {
- String baseScope =
"testhost.k8soperator.flink-operator-test.testopname.";
- String[] metricScope =
- new String[] {
- "namespace", deployment.getMetadata().getNamespace(),
METRIC_GROUP_NAME, "Count"
- };
- return baseScope + String.join(".", metricScope);
- }
-
- private String perStatusIdentifier(FlinkDeployment deployment) {
-
- String baseScope =
"testhost.k8soperator.flink-operator-test.testopname.";
- String[] metricScope =
- new String[] {
- "namespace",
- deployment.getMetadata().getNamespace(),
- METRIC_GROUP_NAME,
-
deployment.getStatus().getJobManagerDeploymentStatus().name(),
- "Count"
- };
-
- return baseScope + String.join(".", metricScope);
- }
}