This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 2adf8f6  [FLINK-28356] Move operator metric recording logic into 
statusrecorder
2adf8f6 is described below

commit 2adf8f6188c86759caa27d1c10ed9d74ed3a1006
Author: Matyas Orhidi <[email protected]>
AuthorDate: Wed Jul 6 14:02:17 2022 +0200

    [FLINK-28356] Move operator metric recording logic into statusrecorder
---
 .../flink/kubernetes/operator/FlinkOperator.java   | 17 +++---
 .../controller/FlinkDeploymentController.java      |  9 +---
 .../controller/FlinkSessionJobController.java      |  9 +---
 .../kubernetes/operator/metrics/MetricManager.java |  2 +-
 .../operator/reconciler/ReconciliationUtils.java   |  4 --
 .../kubernetes/operator/utils/StatusRecorder.java  | 18 +++++--
 .../kubernetes/operator/TestingStatusRecorder.java |  4 +-
 .../TestingFlinkDeploymentController.java          |  8 +--
 .../listener/FlinkResourceListenerTest.java        |  7 ++-
 .../deployment/ApplicationReconcilerTest.java      |  5 +-
 .../deployment/SessionReconcilerTest.java          |  4 +-
 .../sessionjob/SessionJobReconcilerTest.java       |  3 +-
 .../operator/utils/StatusRecorderTest.java         | 61 +++++++++++++++++++++-
 13 files changed, 103 insertions(+), 48 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 248d99c..590af2c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -112,7 +112,9 @@ public class FlinkOperator {
     }
 
     private void registerDeploymentController() {
-        var statusRecorder = 
StatusRecorder.<FlinkDeploymentStatus>create(client, listeners);
+        var statusRecorder =
+                StatusRecorder.<FlinkDeploymentStatus>create(
+                        client, new MetricManager<>(metricGroup), listeners);
         var eventRecorder = EventRecorder.create(client, listeners);
         var reconcilerFactory =
                 new ReconcilerFactory(
@@ -126,7 +128,6 @@ public class FlinkOperator {
                         validators,
                         reconcilerFactory,
                         observerFactory,
-                        new MetricManager<>(metricGroup),
                         statusRecorder,
                         eventRecorder);
         registeredControllers.add(operator.register(controller, 
this::overrideControllerConfigs));
@@ -134,7 +135,9 @@ public class FlinkOperator {
 
     private void registerSessionJobController() {
         var eventRecorder = EventRecorder.create(client, listeners);
-        var statusRecorder = 
StatusRecorder.<FlinkSessionJobStatus>create(client, listeners);
+        var statusRecorder =
+                StatusRecorder.<FlinkSessionJobStatus>create(
+                        client, new MetricManager<>(metricGroup), listeners);
         var reconciler =
                 new SessionJobReconciler(
                         client, flinkService, configManager, eventRecorder, 
statusRecorder);
@@ -142,13 +145,7 @@ public class FlinkOperator {
                 new SessionJobObserver(flinkService, configManager, 
statusRecorder, eventRecorder);
         var controller =
                 new FlinkSessionJobController(
-                        configManager,
-                        validators,
-                        reconciler,
-                        observer,
-                        new MetricManager<>(metricGroup),
-                        statusRecorder);
-
+                        configManager, validators, reconciler, observer, 
statusRecorder);
         registeredControllers.add(operator.register(controller, 
this::overrideControllerConfigs));
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index d75d29c..1bd5946 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import 
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
@@ -65,7 +64,6 @@ public class FlinkDeploymentController
     private final Set<FlinkResourceValidator> validators;
     private final ReconcilerFactory reconcilerFactory;
     private final ObserverFactory observerFactory;
-    private final MetricManager<FlinkDeployment> metricManager;
     private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
     private final EventRecorder eventRecorder;
 
@@ -74,14 +72,12 @@ public class FlinkDeploymentController
             Set<FlinkResourceValidator> validators,
             ReconcilerFactory reconcilerFactory,
             ObserverFactory observerFactory,
-            MetricManager<FlinkDeployment> metricManager,
             StatusRecorder<FlinkDeploymentStatus> statusRecorder,
             EventRecorder eventRecorder) {
         this.configManager = configManager;
         this.validators = validators;
         this.reconcilerFactory = reconcilerFactory;
         this.observerFactory = observerFactory;
-        this.metricManager = metricManager;
         this.statusRecorder = statusRecorder;
         this.eventRecorder = eventRecorder;
     }
@@ -95,7 +91,6 @@ public class FlinkDeploymentController
         } catch (DeploymentFailedException dfe) {
             // ignore during cleanup
         }
-        metricManager.onRemove(flinkApp);
         statusRecorder.removeCachedStatus(flinkApp);
         return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, 
context);
     }
@@ -109,7 +104,6 @@ public class FlinkDeploymentController
         try {
             observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
             if (!validateDeployment(flinkApp)) {
-                metricManager.onUpdate(flinkApp);
                 statusRecorder.patchAndCacheStatus(flinkApp);
                 return ReconciliationUtils.toUpdateControl(
                         configManager.getOperatorConfiguration(),
@@ -126,7 +120,6 @@ public class FlinkDeploymentController
         }
 
         LOG.info("End of reconciliation");
-        metricManager.onUpdate(flinkApp);
         statusRecorder.patchAndCacheStatus(flinkApp);
         return ReconciliationUtils.toUpdateControl(
                 configManager.getOperatorConfiguration(), flinkApp, 
previousDeployment, true);
@@ -157,7 +150,7 @@ public class FlinkDeploymentController
     public ErrorStatusUpdateControl<FlinkDeployment> updateErrorStatus(
             FlinkDeployment flinkDeployment, Context<FlinkDeployment> context, 
Exception e) {
         return ReconciliationUtils.toErrorStatusUpdateControl(
-                flinkDeployment, context.getRetryInfo(), e, metricManager, 
statusRecorder);
+                flinkDeployment, context.getRetryInfo(), e, statusRecorder);
     }
 
     private boolean validateDeployment(FlinkDeployment deployment) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index 7f2f6bd..3727af4 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
@@ -61,7 +60,6 @@ public class FlinkSessionJobController
     private final Set<FlinkResourceValidator> validators;
     private final Reconciler<FlinkSessionJob> reconciler;
     private final Observer<FlinkSessionJob> observer;
-    private final MetricManager<FlinkSessionJob> metricManager;
     private final StatusRecorder<FlinkSessionJobStatus> statusRecorder;
 
     public FlinkSessionJobController(
@@ -69,13 +67,11 @@ public class FlinkSessionJobController
             Set<FlinkResourceValidator> validators,
             Reconciler<FlinkSessionJob> reconciler,
             Observer<FlinkSessionJob> observer,
-            MetricManager<FlinkSessionJob> metricManager,
             StatusRecorder<FlinkSessionJobStatus> statusRecorder) {
         this.configManager = configManager;
         this.validators = validators;
         this.reconciler = reconciler;
         this.observer = observer;
-        this.metricManager = metricManager;
         this.statusRecorder = statusRecorder;
     }
 
@@ -89,7 +85,6 @@ public class FlinkSessionJobController
 
         observer.observe(flinkSessionJob, context);
         if (!validateSessionJob(flinkSessionJob, context)) {
-            metricManager.onUpdate(flinkSessionJob);
             statusRecorder.patchAndCacheStatus(flinkSessionJob);
             return ReconciliationUtils.toUpdateControl(
                     configManager.getOperatorConfiguration(), flinkSessionJob, 
previousJob, false);
@@ -101,7 +96,6 @@ public class FlinkSessionJobController
         } catch (Exception e) {
             throw new ReconciliationException(e);
         }
-        metricManager.onUpdate(flinkSessionJob);
         statusRecorder.patchAndCacheStatus(flinkSessionJob);
         return ReconciliationUtils.toUpdateControl(
                 configManager.getOperatorConfiguration(), flinkSessionJob, 
previousJob, true);
@@ -110,7 +104,6 @@ public class FlinkSessionJobController
     @Override
     public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) {
         LOG.info("Deleting FlinkSessionJob");
-        metricManager.onRemove(sessionJob);
         statusRecorder.removeCachedStatus(sessionJob);
         return reconciler.cleanup(sessionJob, context);
     }
@@ -119,7 +112,7 @@ public class FlinkSessionJobController
     public ErrorStatusUpdateControl<FlinkSessionJob> updateErrorStatus(
             FlinkSessionJob sessionJob, Context<FlinkSessionJob> context, 
Exception e) {
         return ReconciliationUtils.toErrorStatusUpdateControl(
-                sessionJob, context.getRetryInfo(), e, metricManager, 
statusRecorder);
+                sessionJob, context.getRetryInfo(), e, statusRecorder);
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
index 6dbfb3b..8cd9c3d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
@@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 /** Metric manager for Operator managed custom resources. */
 public class MetricManager<CR extends CustomResource<?, ?>> {
-    private static final String NS_SCOPE_KEY = "resourcens";
+    public static final String NS_SCOPE_KEY = "resourcens";
     private final MetricGroup metricGroup;
     private final Map<String, CustomResourceMetrics> metrics = new 
ConcurrentHashMap<>();
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 073d088..79ee2da 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -32,7 +32,6 @@ import 
org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.util.Preconditions;
@@ -390,7 +389,6 @@ public class ReconciliationUtils {
      * @param resource Flink Resource to be updated
      * @param retryInfo Current RetryInformation
      * @param e Exception that caused the retry
-     * @param metricManager Metric manager to be updated
      * @param statusRecorder statusRecorder object for patching status
      * @param <STATUS> Status type.
      * @param <R> Resource type.
@@ -401,7 +399,6 @@ public class ReconciliationUtils {
                     R resource,
                     Optional<RetryInfo> retryInfo,
                     Exception e,
-                    MetricManager<R> metricManager,
                     StatusRecorder<STATUS> statusRecorder) {
 
         retryInfo.ifPresent(
@@ -416,7 +413,6 @@ public class ReconciliationUtils {
         ReconciliationUtils.updateForReconciliationError(
                 resource,
                 (e instanceof ReconciliationException) ? 
e.getCause().toString() : e.toString());
-        metricManager.onUpdate(resource);
         statusRecorder.patchAndCacheStatus(resource);
 
         // Status was updated already, no need to return anything
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index 057d46a..4b6fbdb 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -25,11 +25,11 @@ import 
org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import io.fabric8.kubernetes.api.model.HasMetadata;
-import io.fabric8.kubernetes.client.CustomResource;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import lombok.SneakyThrows;
 import org.slf4j.Logger;
@@ -50,13 +50,16 @@ public class StatusRecorder<STATUS extends CommonStatus<?>> 
{
             new ConcurrentHashMap<>();
 
     private final KubernetesClient client;
+    private final MetricManager<AbstractFlinkResource<?, STATUS>> 
metricManager;
     private final BiConsumer<AbstractFlinkResource<?, STATUS>, STATUS> 
statusUpdateListener;
 
     public StatusRecorder(
             KubernetesClient client,
+            MetricManager<AbstractFlinkResource<?, STATUS>> metricManager,
             BiConsumer<AbstractFlinkResource<?, STATUS>, STATUS> 
statusUpdateListener) {
         this.client = client;
         this.statusUpdateListener = statusUpdateListener;
+        this.metricManager = metricManager;
     }
 
     /**
@@ -103,6 +106,7 @@ public class StatusRecorder<STATUS extends CommonStatus<?>> 
{
                         .withName(name)
                         .patchStatus(resource);
                 statusUpdateListener.accept(resource, prevStatus);
+                metricManager.onUpdate(resource);
                 return;
             } catch (Exception e) {
                 LOG.error("Error while patching status, retrying {}/3...", (i 
+ 1), e);
@@ -124,7 +128,7 @@ public class StatusRecorder<STATUS extends CommonStatus<?>> 
{
      * @param resource Resource for which the status should be updated from 
the cache
      * @param <T> Custom resource type.
      */
-    public <T extends CustomResource<?, STATUS>> void updateStatusFromCache(T 
resource) {
+    public <T extends AbstractFlinkResource<?, STATUS>> void 
updateStatusFromCache(T resource) {
         var key = getKey(resource);
         var cachedStatus = statusCache.get(key);
         if (cachedStatus != null) {
@@ -136,6 +140,7 @@ public class StatusRecorder<STATUS extends CommonStatus<?>> 
{
             // Initialize cache with current status copy
             statusCache.put(key, 
objectMapper.convertValue(resource.getStatus(), ObjectNode.class));
         }
+        metricManager.onUpdate(resource);
     }
 
     /**
@@ -144,8 +149,9 @@ public class StatusRecorder<STATUS extends CommonStatus<?>> 
{
      * @param resource Flink resource.
      * @param <T> Resource type.
      */
-    public <T extends CustomResource<?, STATUS>> void removeCachedStatus(T 
resource) {
+    public <T extends AbstractFlinkResource<?, STATUS>> void 
removeCachedStatus(T resource) {
         statusCache.remove(getKey(resource));
+        metricManager.onRemove(resource);
     }
 
     protected static Tuple2<String, String> getKey(HasMetadata resource) {
@@ -153,7 +159,9 @@ public class StatusRecorder<STATUS extends CommonStatus<?>> 
{
     }
 
     public static <S extends CommonStatus<?>> StatusRecorder<S> create(
-            KubernetesClient kubernetesClient, 
Collection<FlinkResourceListener> listeners) {
+            KubernetesClient kubernetesClient,
+            MetricManager<AbstractFlinkResource<?, S>> metricManager,
+            Collection<FlinkResourceListener> listeners) {
         BiConsumer<AbstractFlinkResource<?, S>, S> consumer =
                 (resource, previousStatus) -> {
                     var ctx =
@@ -183,6 +191,6 @@ public class StatusRecorder<STATUS extends CommonStatus<?>> 
{
                                 }
                             });
                 };
-        return new StatusRecorder<>(kubernetesClient, consumer);
+        return new StatusRecorder<>(kubernetesClient, metricManager, consumer);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
index 0243fd5..8306761 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
@@ -20,7 +20,9 @@ package org.apache.flink.kubernetes.operator;
 
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.metrics.testutils.MetricListener;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -28,7 +30,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 public class TestingStatusRecorder<STATUS extends CommonStatus<?>> extends 
StatusRecorder<STATUS> {
 
     public TestingStatusRecorder() {
-        super(null, (r, s) -> {});
+        super(null, new MetricManager<>(new 
MetricListener().getMetricGroup()), (r, s) -> {});
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index d3909b4..f78b769 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -64,8 +64,11 @@ public class TestingFlinkDeploymentController
             KubernetesClient kubernetesClient,
             TestingFlinkService flinkService) {
         eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
-        statusRecorder = new StatusRecorder<>(kubernetesClient, 
statusUpdateCounter);
-
+        statusRecorder =
+                new StatusRecorder<>(
+                        kubernetesClient,
+                        new MetricManager<>(new 
MetricListener().getMetricGroup()),
+                        statusUpdateCounter);
         flinkDeploymentController =
                 new FlinkDeploymentController(
                         configManager,
@@ -78,7 +81,6 @@ public class TestingFlinkDeploymentController
                                 statusRecorder),
                         new ObserverFactory(
                                 flinkService, configManager, statusRecorder, 
eventRecorder),
-                        new MetricManager<>(new 
MetricListener().getMetricGroup()),
                         statusRecorder,
                         eventRecorder);
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
index 86fe742..10a25da 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
@@ -21,8 +21,10 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.metrics.testutils.MetricListener;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -52,7 +54,10 @@ public class FlinkResourceListenerTest {
         var listeners = List.<FlinkResourceListener>of(listener1, listener2);
 
         var statusRecorder =
-                StatusRecorder.<FlinkDeploymentStatus>create(kubernetesClient, 
listeners);
+                StatusRecorder.<FlinkDeploymentStatus>create(
+                        kubernetesClient,
+                        new MetricManager<>(new 
MetricListener().getMetricGroup()),
+                        listeners);
         var eventRecorder = EventRecorder.create(kubernetesClient, listeners);
 
         var deployment = TestUtils.buildApplicationCluster();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 439d848..b712215 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -39,7 +40,6 @@ import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
 
@@ -77,8 +77,7 @@ public class ApplicationReconcilerTest {
     public void before() {
         
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
         var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
-        var statusRecoder =
-                new StatusRecorder<FlinkDeploymentStatus>(kubernetesClient, 
(r, e) -> {});
+        var statusRecoder = new TestingStatusRecorder<FlinkDeploymentStatus>();
         reconciler =
                 new ApplicationReconciler(
                         kubernetesClient,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index 903873e..2e13d28 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -20,10 +20,10 @@ package 
org.apache.flink.kubernetes.operator.reconciler.deployment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -63,7 +63,7 @@ public class SessionReconcilerTest {
                         flinkService,
                         configManager,
                         eventRecorder,
-                        new StatusRecorder<>(kubernetesClient, (r, e) -> {}));
+                        new TestingStatusRecorder<>());
         FlinkDeployment deployment = TestUtils.buildSessionCluster();
         kubernetesClient.resource(deployment).createOrReplace();
         reconciler.reconcile(deployment, context);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 8846544..a28e2f1 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
@@ -84,7 +85,7 @@ public class SessionJobReconcilerTest {
                 new SessionJobReconciler(
                         null, flinkService, configManager, eventRecorder, 
statusRecoder);
         
kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
-        statusRecoder = new 
StatusRecorder<FlinkSessionJobStatus>(kubernetesClient, (r, e) -> {});
+        statusRecoder = new TestingStatusRecorder<>();
         reconciler =
                 new SessionJobReconciler(
                         null, flinkService, configManager, eventRecorder, 
statusRecoder);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
index 6724005..e3cf2eb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
@@ -18,14 +18,21 @@
 package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
+import org.apache.flink.metrics.testutils.MetricListener;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.METRIC_GROUP_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.MetricManager.NS_SCOPE_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for {@link StatusRecorder}. */
@@ -37,7 +44,11 @@ public class StatusRecorderTest {
 
     @Test
     public void testPatchOnlyWhenChanged() throws InterruptedException {
-        var helper = new 
StatusRecorder<FlinkDeploymentStatus>(kubernetesClient, (e, s) -> {});
+        var helper =
+                new StatusRecorder<FlinkDeploymentStatus>(
+                        kubernetesClient,
+                        new MetricManager<>(new 
MetricListener().getMetricGroup()),
+                        (e, s) -> {});
         var deployment = TestUtils.buildApplicationCluster();
         kubernetesClient.resource(deployment).createOrReplace();
         var lastRequest = mockServer.getLastRequest();
@@ -56,4 +67,52 @@ public class StatusRecorderTest {
         helper.patchAndCacheStatus(deployment);
         assertTrue(mockServer.getLastRequest() == lastRequest);
     }
+
+    @Test
+    public void testFlinkDeploymentMetrics() throws InterruptedException {
+        var metricListener = new MetricListener();
+        var helper =
+                new StatusRecorder<FlinkDeploymentStatus>(
+                        kubernetesClient,
+                        new MetricManager<>(metricListener.getMetricGroup()),
+                        (e, s) -> {});
+
+        var deployment = TestUtils.buildApplicationCluster();
+        kubernetesClient.resource(deployment).createOrReplace();
+
+        helper.updateStatusFromCache(deployment);
+        assertEquals(1, 
metricListener.getGauge(totalIdentifier(deployment)).get().getValue());
+        assertEquals(1, 
metricListener.getGauge(perStatusIdentifier(deployment)).get().getValue());
+
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            deployment.getStatus().setJobManagerDeploymentStatus(status);
+            helper.patchAndCacheStatus(deployment);
+            assertEquals(1, 
metricListener.getGauge(totalIdentifier(deployment)).get().getValue());
+            assertEquals(
+                    1, 
metricListener.getGauge(perStatusIdentifier(deployment)).get().getValue());
+        }
+
+        helper.removeCachedStatus(deployment);
+        assertEquals(0, 
metricListener.getGauge(totalIdentifier(deployment)).get().getValue());
+        for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
+            assertEquals(
+                    0, 
metricListener.getGauge(perStatusIdentifier(deployment)).get().getValue());
+        }
+    }
+
+    private String[] totalIdentifier(FlinkDeployment deployment) {
+        return new String[] {
+            NS_SCOPE_KEY, deployment.getMetadata().getNamespace(), 
METRIC_GROUP_NAME, "Count"
+        };
+    }
+
+    private String[] perStatusIdentifier(FlinkDeployment deployment) {
+        return new String[] {
+            NS_SCOPE_KEY,
+            deployment.getMetadata().getNamespace(),
+            METRIC_GROUP_NAME,
+            deployment.getStatus().getJobManagerDeploymentStatus().name(),
+            "Count"
+        };
+    }
 }

Reply via email to