This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new a28cb33 [FLINK-26953] Introduce Operator Specific Metrics
a28cb33 is described below
commit a28cb33617c7027b30722cd4b0881e0558aa7202
Author: Matyas Orhidi <[email protected]>
AuthorDate: Tue May 3 12:59:46 2022 +0200
[FLINK-26953] Introduce Operator Specific Metrics
---
flink-kubernetes-operator/pom.xml | 7 ++
.../flink/kubernetes/operator/FlinkOperator.java | 20 +++++-
.../controller/FlinkDeploymentController.java | 11 ++-
.../controller/FlinkSessionJobController.java | 12 +++-
.../operator/metrics/CustomResourceMetrics.java | 27 +++++++
.../operator/metrics/FlinkDeploymentMetrics.java | 63 ++++++++++++++++
.../operator/metrics/FlinkSessionJobMetrics.java | 44 ++++++++++++
.../kubernetes/operator/metrics/MetricManager.java | 63 ++++++++++++++++
.../operator/metrics/OperatorMetricUtils.java | 3 +-
.../flink/kubernetes/operator/TestUtils.java | 5 +-
.../metrics/FlinkDeploymentMetricsTest.java | 83 ++++++++++++++++++++++
.../metrics/FlinkSessionJobMetricsTest.java | 46 ++++++++++++
12 files changed, 374 insertions(+), 10 deletions(-)
diff --git a/flink-kubernetes-operator/pom.xml
b/flink-kubernetes-operator/pom.xml
index c7ccfba..8c886ba 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -135,6 +135,13 @@ under the License.
<version>${fabric8.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 6e3f535..03d5a03 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -29,6 +29,7 @@ import
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController
import
org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
import org.apache.flink.kubernetes.operator.observer.Observer;
import
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
@@ -40,6 +41,7 @@ import
org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+import org.apache.flink.metrics.MetricGroup;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -66,6 +68,7 @@ public class FlinkOperator {
private final ConfigurationService configurationService;
private final FlinkConfigManager configManager;
private final Set<FlinkResourceValidator> validators;
+ private final MetricGroup metricGroup;
public FlinkOperator(@Nullable Configuration conf) {
this.client = new DefaultKubernetesClient();
@@ -75,7 +78,8 @@ public class FlinkOperator {
this.operator = new Operator(client, configurationService);
this.flinkService = new FlinkService(client, configManager);
this.validators = ValidatorUtils.discoverValidators(configManager);
-
OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
+ this.metricGroup =
+
OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configManager.getDefaultConfig());
FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
@@ -88,7 +92,12 @@ public class FlinkOperator {
FlinkDeploymentController controller =
new FlinkDeploymentController(
- configManager, client, validators, reconcilerFactory,
observerFactory);
+ configManager,
+ client,
+ validators,
+ reconcilerFactory,
+ observerFactory,
+ new MetricManager<>(metricGroup));
FlinkControllerConfig<FlinkDeployment> controllerConfig =
new FlinkControllerConfig<>(
@@ -105,7 +114,12 @@ public class FlinkOperator {
Observer<FlinkSessionJob> observer = new
SessionJobObserver(flinkService, configManager);
FlinkSessionJobController controller =
new FlinkSessionJobController(
- configManager, client, validators, reconciler,
observer);
+ configManager,
+ client,
+ validators,
+ reconciler,
+ observer,
+ new MetricManager<>(metricGroup));
FlinkControllerConfig<FlinkSessionJob> controllerConfig =
new FlinkControllerConfig<>(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 7152966..8138d97 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -22,6 +22,7 @@ import
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
@@ -63,7 +64,7 @@ public class FlinkDeploymentController
private final Set<FlinkResourceValidator> validators;
private final ReconcilerFactory reconcilerFactory;
private final ObserverFactory observerFactory;
-
+ private final MetricManager<FlinkDeployment> metricManager;
private FlinkControllerConfig<FlinkDeployment> controllerConfig;
public FlinkDeploymentController(
@@ -71,12 +72,14 @@ public class FlinkDeploymentController
KubernetesClient kubernetesClient,
Set<FlinkResourceValidator> validators,
ReconcilerFactory reconcilerFactory,
- ObserverFactory observerFactory) {
+ ObserverFactory observerFactory,
+ MetricManager<FlinkDeployment> metricManager) {
this.configManager = configManager;
this.kubernetesClient = kubernetesClient;
this.validators = validators;
this.reconcilerFactory = reconcilerFactory;
this.observerFactory = observerFactory;
+ this.metricManager = metricManager;
}
@Override
@@ -87,6 +90,7 @@ public class FlinkDeploymentController
} catch (DeploymentFailedException dfe) {
// ignore during cleanup
}
+ metricManager.onRemove(flinkApp);
return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp,
context);
}
@@ -100,6 +104,7 @@ public class FlinkDeploymentController
if (validationError.isPresent()) {
LOG.error("Validation failed: " + validationError.get());
ReconciliationUtils.updateForReconciliationError(flinkApp,
validationError.get());
+ metricManager.onUpdate(flinkApp);
return ReconciliationUtils.toUpdateControl(
configManager.getOperatorConfiguration(),
originalCopy, flinkApp, false);
}
@@ -111,6 +116,7 @@ public class FlinkDeploymentController
}
LOG.info("End of reconciliation");
+ metricManager.onUpdate(flinkApp);
return ReconciliationUtils.toUpdateControl(
configManager.getOperatorConfiguration(), originalCopy,
flinkApp, true);
}
@@ -153,6 +159,7 @@ public class FlinkDeploymentController
ReconciliationUtils.updateForReconciliationError(
flinkApp,
(e instanceof ReconciliationException) ?
e.getCause().toString() : e.toString());
+ metricManager.onUpdate(flinkApp);
return Optional.of(flinkApp);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index f7bc660..fd09fff 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -21,6 +21,7 @@ import
org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
@@ -73,6 +74,7 @@ public class FlinkSessionJobController
private final Set<FlinkResourceValidator> validators;
private final Reconciler<FlinkSessionJob> reconciler;
private final Observer<FlinkSessionJob> observer;
+ private final MetricManager<FlinkSessionJob> metricManager;
private Map<String, SharedIndexInformer<FlinkSessionJob>> informers;
private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
@@ -81,12 +83,14 @@ public class FlinkSessionJobController
KubernetesClient kubernetesClient,
Set<FlinkResourceValidator> validators,
Reconciler<FlinkSessionJob> reconciler,
- Observer<FlinkSessionJob> observer) {
+ Observer<FlinkSessionJob> observer,
+ MetricManager<FlinkSessionJob> metricManager) {
this.configManager = configManager;
this.kubernetesClient = kubernetesClient;
this.validators = validators;
this.reconciler = reconciler;
this.observer = observer;
+ this.metricManager = metricManager;
}
public void init(FlinkControllerConfig<FlinkSessionJob> config) {
@@ -105,6 +109,7 @@ public class FlinkSessionJobController
LOG.error("Validation failed: " + validationError.get());
ReconciliationUtils.updateForReconciliationError(
flinkSessionJob, validationError.get());
+ metricManager.onUpdate(flinkSessionJob);
return ReconciliationUtils.toUpdateControl(originalCopy,
flinkSessionJob);
}
@@ -114,7 +119,7 @@ public class FlinkSessionJobController
} catch (Exception e) {
throw new ReconciliationException(e);
}
-
+ metricManager.onUpdate(flinkSessionJob);
return ReconciliationUtils.toUpdateControl(originalCopy,
flinkSessionJob)
.rescheduleAfter(
configManager.getOperatorConfiguration().getReconcileInterval().toMillis());
@@ -123,7 +128,7 @@ public class FlinkSessionJobController
@Override
public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) {
LOG.info("Deleting FlinkSessionJob");
-
+ metricManager.onRemove(sessionJob);
return reconciler.cleanup(sessionJob, context);
}
@@ -138,6 +143,7 @@ public class FlinkSessionJobController
ReconciliationUtils.updateForReconciliationError(
flinkSessionJob,
(e instanceof ReconciliationException) ?
e.getCause().toString() : e.toString());
+ metricManager.onUpdate(flinkSessionJob);
return Optional.of(flinkSessionJob);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/CustomResourceMetrics.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/CustomResourceMetrics.java
new file mode 100644
index 0000000..512db41
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/CustomResourceMetrics.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import io.fabric8.kubernetes.client.CustomResource;
+
+/** Custom resource metric type. */
+public interface CustomResourceMetrics<CR extends CustomResource> {
+ void onUpdate(CR customResource);
+
+ void onRemove(CR customResource);
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
new file mode 100644
index 0000000..3ccc63c
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** FlinkDeployment metrics. */
+public class FlinkDeploymentMetrics implements
CustomResourceMetrics<FlinkDeployment> {
+
+ private final Map<JobManagerDeploymentStatus, Set<String>> statuses = new
HashMap<>();
+ public static final String METRIC_GROUP_NAME = "FlinkDeployment";
+
+ public FlinkDeploymentMetrics(MetricGroup parentMetricGroup) {
+ MetricGroup flinkDeploymentMetrics =
parentMetricGroup.addGroup(METRIC_GROUP_NAME);
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ statuses.put(status, ConcurrentHashMap.newKeySet());
+ }
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ statuses.put(status, new HashSet<>());
+ MetricGroup metricGroup =
flinkDeploymentMetrics.addGroup(status.toString());
+ metricGroup.gauge("Count", () -> statuses.get(status).size());
+ }
+ flinkDeploymentMetrics.gauge(
+ "Count", () ->
statuses.values().stream().mapToInt(Set::size).sum());
+ }
+
+ public void onUpdate(FlinkDeployment flinkApp) {
+ onRemove(flinkApp);
+ statuses.get(flinkApp.getStatus().getJobManagerDeploymentStatus())
+ .add(flinkApp.getMetadata().getName());
+ }
+
+ public void onRemove(FlinkDeployment flinkApp) {
+ statuses.values()
+ .forEach(
+ deployments -> {
+
deployments.remove(flinkApp.getMetadata().getName());
+ });
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
new file mode 100644
index 0000000..492ed0d
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** FlinkSessionJob metrics. */
+public class FlinkSessionJobMetrics implements
CustomResourceMetrics<FlinkSessionJob> {
+
+ private final Set<String> sessionJobs = ConcurrentHashMap.newKeySet();
+ public static final String METRIC_GROUP_NAME = "FlinkSessionJob";
+
+ public FlinkSessionJobMetrics(MetricGroup parentMetricGroup) {
+ MetricGroup flinkSessionJobMetrics =
parentMetricGroup.addGroup(METRIC_GROUP_NAME);
+ flinkSessionJobMetrics.gauge("Count", () -> sessionJobs.size());
+ }
+
+ public void onUpdate(FlinkSessionJob sessionJob) {
+ sessionJobs.add(sessionJob.getMetadata().getName());
+ }
+
+ public void onRemove(FlinkSessionJob sessionJob) {
+ sessionJobs.remove(sessionJob.getMetadata().getName());
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
new file mode 100644
index 0000000..6dbfb3b
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.metrics.MetricGroup;
+
+import io.fabric8.kubernetes.client.CustomResource;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Metric manager for Operator managed custom resources. */
+public class MetricManager<CR extends CustomResource<?, ?>> {
+ private static final String NS_SCOPE_KEY = "resourcens";
+ private final MetricGroup metricGroup;
+ private final Map<String, CustomResourceMetrics> metrics = new
ConcurrentHashMap<>();
+
+ public MetricManager(MetricGroup metricGroup) {
+ this.metricGroup = metricGroup;
+ }
+
+ public void onUpdate(CR cr) {
+ getCustomResourceMetrics(cr).onUpdate(cr);
+ }
+
+ public void onRemove(CR cr) {
+ getCustomResourceMetrics(cr).onRemove(cr);
+ }
+
+ private CustomResourceMetrics getCustomResourceMetrics(CR cr) {
+ return metrics.computeIfAbsent(
+ cr.getMetadata().getNamespace(), k ->
getCustomResourceMetricsImpl(cr));
+ }
+
+ private CustomResourceMetrics getCustomResourceMetricsImpl(CR cr) {
+ if (cr instanceof FlinkDeployment) {
+ return new FlinkDeploymentMetrics(
+ metricGroup.addGroup(NS_SCOPE_KEY,
cr.getMetadata().getNamespace()));
+ } else if (cr instanceof FlinkSessionJob) {
+ return new FlinkSessionJobMetrics(
+ metricGroup.addGroup(NS_SCOPE_KEY,
cr.getMetadata().getNamespace()));
+ } else {
+ throw new IllegalArgumentException("Unknown CustomResource");
+ }
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
index 467776d..0e53caf 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
@@ -43,7 +43,7 @@ public class OperatorMetricUtils {
private static final String OPERATOR_METRICS_PREFIX =
"kubernetes.operator.metrics.";
private static final String METRICS_PREFIX = "metrics.";
- public static void initOperatorMetrics(Configuration defaultConfig) {
+ public static MetricGroup initOperatorMetrics(Configuration defaultConfig)
{
Configuration metricConfig = createMetricConfig(defaultConfig);
LOG.info("Initializing operator metrics using conf: {}", metricConfig);
PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(metricConfig);
@@ -58,6 +58,7 @@ public class OperatorMetricUtils {
EnvUtils.getOrDefault(EnvUtils.ENV_HOSTNAME,
"localhost"));
MetricGroup statusGroup = operatorMetricGroup.addGroup("Status");
MetricUtils.instantiateStatusMetrics(statusGroup);
+ return operatorMetricGroup;
}
@VisibleForTesting
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 811e09f..991e3a7 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -38,9 +38,11 @@ import
org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
+import org.apache.flink.metrics.testutils.MetricListener;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerStatus;
@@ -350,7 +352,8 @@ public class TestUtils {
kubernetesClient,
ValidatorUtils.discoverValidators(configManager),
new ReconcilerFactory(kubernetesClient, flinkService,
configManager),
- new ObserverFactory(flinkService, configManager));
+ new ObserverFactory(flinkService, configManager),
+ new MetricManager<>(new
MetricListener().getMetricGroup()));
controller.setControllerConfig(
new FlinkControllerConfig(controller, Collections.emptySet()));
return controller;
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
new file mode 100644
index 0000000..db6e433
--- /dev/null
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.METRIC_GROUP_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** @link FlinkDeploymentMetrics tests. */
+public class FlinkDeploymentMetricsTest {
+
+ @Test
+ public void testMetrics() {
+ MetricListener metricListener = new MetricListener();
+ FlinkDeploymentMetrics metrics =
+ new FlinkDeploymentMetrics(metricListener.getMetricGroup());
+
+ assertTrue(metricListener.getGauge(METRIC_GROUP_NAME,
"Count").isPresent());
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ assertTrue(
+ metricListener
+ .getGauge(METRIC_GROUP_NAME, status.toString(),
"Count")
+ .isPresent());
+ }
+
+ assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME,
"Count").get().getValue());
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ assertEquals(
+ 0,
+ metricListener
+ .getGauge(METRIC_GROUP_NAME, status.toString(),
"Count")
+ .get()
+ .getValue());
+ }
+
+ FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
+
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ flinkDeployment.getStatus().setJobManagerDeploymentStatus(status);
+ metrics.onUpdate(flinkDeployment);
+ assertEquals(
+ 1,
+ metricListener
+ .getGauge(METRIC_GROUP_NAME, status.toString(),
"Count")
+ .get()
+ .getValue());
+ assertEquals(1, metricListener.getGauge(METRIC_GROUP_NAME,
"Count").get().getValue());
+ }
+
+ metrics.onRemove(flinkDeployment);
+ assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME,
"Count").get().getValue());
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ assertEquals(
+ 0,
+ metricListener
+ .getGauge(METRIC_GROUP_NAME, status.toString(),
"Count")
+ .get()
+ .getValue());
+ }
+ }
+}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
new file mode 100644
index 0000000..21f9a6b
--- /dev/null
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.flink.kubernetes.operator.metrics.FlinkSessionJobMetrics.METRIC_GROUP_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** @link FlinkSessionJobMetrics tests. */
+public class FlinkSessionJobMetricsTest {
+
+ @Test
+ public void testMetrics() {
+ MetricListener metricListener = new MetricListener();
+ FlinkSessionJobMetrics metrics =
+ new FlinkSessionJobMetrics(metricListener.getMetricGroup());
+ assertTrue(metricListener.getGauge(METRIC_GROUP_NAME,
"Count").isPresent());
+ assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME,
"Count").get().getValue());
+ FlinkSessionJob flinkSessionJob = TestUtils.buildSessionJob();
+ metrics.onUpdate(flinkSessionJob);
+ assertEquals(1, metricListener.getGauge(METRIC_GROUP_NAME,
"Count").get().getValue());
+ metrics.onRemove(flinkSessionJob);
+ assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME,
"Count").get().getValue());
+ }
+}