This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 2adf8f6 [FLINK-28356] Move operator metric recording logic into
statusrecorder
2adf8f6 is described below
commit 2adf8f6188c86759caa27d1c10ed9d74ed3a1006
Author: Matyas Orhidi <[email protected]>
AuthorDate: Wed Jul 6 14:02:17 2022 +0200
[FLINK-28356] Move operator metric recording logic into statusrecorder
---
.../flink/kubernetes/operator/FlinkOperator.java | 17 +++---
.../controller/FlinkDeploymentController.java | 9 +---
.../controller/FlinkSessionJobController.java | 9 +---
.../kubernetes/operator/metrics/MetricManager.java | 2 +-
.../operator/reconciler/ReconciliationUtils.java | 4 --
.../kubernetes/operator/utils/StatusRecorder.java | 18 +++++--
.../kubernetes/operator/TestingStatusRecorder.java | 4 +-
.../TestingFlinkDeploymentController.java | 8 +--
.../listener/FlinkResourceListenerTest.java | 7 ++-
.../deployment/ApplicationReconcilerTest.java | 5 +-
.../deployment/SessionReconcilerTest.java | 4 +-
.../sessionjob/SessionJobReconcilerTest.java | 3 +-
.../operator/utils/StatusRecorderTest.java | 61 +++++++++++++++++++++-
13 files changed, 103 insertions(+), 48 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 248d99c..590af2c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -112,7 +112,9 @@ public class FlinkOperator {
}
private void registerDeploymentController() {
- var statusRecorder =
StatusRecorder.<FlinkDeploymentStatus>create(client, listeners);
+ var statusRecorder =
+ StatusRecorder.<FlinkDeploymentStatus>create(
+ client, new MetricManager<>(metricGroup), listeners);
var eventRecorder = EventRecorder.create(client, listeners);
var reconcilerFactory =
new ReconcilerFactory(
@@ -126,7 +128,6 @@ public class FlinkOperator {
validators,
reconcilerFactory,
observerFactory,
- new MetricManager<>(metricGroup),
statusRecorder,
eventRecorder);
registeredControllers.add(operator.register(controller,
this::overrideControllerConfigs));
@@ -134,7 +135,9 @@ public class FlinkOperator {
private void registerSessionJobController() {
var eventRecorder = EventRecorder.create(client, listeners);
- var statusRecorder =
StatusRecorder.<FlinkSessionJobStatus>create(client, listeners);
+ var statusRecorder =
+ StatusRecorder.<FlinkSessionJobStatus>create(
+ client, new MetricManager<>(metricGroup), listeners);
var reconciler =
new SessionJobReconciler(
client, flinkService, configManager, eventRecorder,
statusRecorder);
@@ -142,13 +145,7 @@ public class FlinkOperator {
new SessionJobObserver(flinkService, configManager,
statusRecorder, eventRecorder);
var controller =
new FlinkSessionJobController(
- configManager,
- validators,
- reconciler,
- observer,
- new MetricManager<>(metricGroup),
- statusRecorder);
-
+ configManager, validators, reconciler, observer,
statusRecorder);
registeredControllers.add(operator.register(controller,
this::overrideControllerConfigs));
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index d75d29c..1bd5946 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -24,7 +24,6 @@ import
org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
@@ -65,7 +64,6 @@ public class FlinkDeploymentController
private final Set<FlinkResourceValidator> validators;
private final ReconcilerFactory reconcilerFactory;
private final ObserverFactory observerFactory;
- private final MetricManager<FlinkDeployment> metricManager;
private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
private final EventRecorder eventRecorder;
@@ -74,14 +72,12 @@ public class FlinkDeploymentController
Set<FlinkResourceValidator> validators,
ReconcilerFactory reconcilerFactory,
ObserverFactory observerFactory,
- MetricManager<FlinkDeployment> metricManager,
StatusRecorder<FlinkDeploymentStatus> statusRecorder,
EventRecorder eventRecorder) {
this.configManager = configManager;
this.validators = validators;
this.reconcilerFactory = reconcilerFactory;
this.observerFactory = observerFactory;
- this.metricManager = metricManager;
this.statusRecorder = statusRecorder;
this.eventRecorder = eventRecorder;
}
@@ -95,7 +91,6 @@ public class FlinkDeploymentController
} catch (DeploymentFailedException dfe) {
// ignore during cleanup
}
- metricManager.onRemove(flinkApp);
statusRecorder.removeCachedStatus(flinkApp);
return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp,
context);
}
@@ -109,7 +104,6 @@ public class FlinkDeploymentController
try {
observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
if (!validateDeployment(flinkApp)) {
- metricManager.onUpdate(flinkApp);
statusRecorder.patchAndCacheStatus(flinkApp);
return ReconciliationUtils.toUpdateControl(
configManager.getOperatorConfiguration(),
@@ -126,7 +120,6 @@ public class FlinkDeploymentController
}
LOG.info("End of reconciliation");
- metricManager.onUpdate(flinkApp);
statusRecorder.patchAndCacheStatus(flinkApp);
return ReconciliationUtils.toUpdateControl(
configManager.getOperatorConfiguration(), flinkApp,
previousDeployment, true);
@@ -157,7 +150,7 @@ public class FlinkDeploymentController
public ErrorStatusUpdateControl<FlinkDeployment> updateErrorStatus(
FlinkDeployment flinkDeployment, Context<FlinkDeployment> context,
Exception e) {
return ReconciliationUtils.toErrorStatusUpdateControl(
- flinkDeployment, context.getRetryInfo(), e, metricManager,
statusRecorder);
+ flinkDeployment, context.getRetryInfo(), e, statusRecorder);
}
private boolean validateDeployment(FlinkDeployment deployment) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index 7f2f6bd..3727af4 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -22,7 +22,6 @@ import
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
@@ -61,7 +60,6 @@ public class FlinkSessionJobController
private final Set<FlinkResourceValidator> validators;
private final Reconciler<FlinkSessionJob> reconciler;
private final Observer<FlinkSessionJob> observer;
- private final MetricManager<FlinkSessionJob> metricManager;
private final StatusRecorder<FlinkSessionJobStatus> statusRecorder;
public FlinkSessionJobController(
@@ -69,13 +67,11 @@ public class FlinkSessionJobController
Set<FlinkResourceValidator> validators,
Reconciler<FlinkSessionJob> reconciler,
Observer<FlinkSessionJob> observer,
- MetricManager<FlinkSessionJob> metricManager,
StatusRecorder<FlinkSessionJobStatus> statusRecorder) {
this.configManager = configManager;
this.validators = validators;
this.reconciler = reconciler;
this.observer = observer;
- this.metricManager = metricManager;
this.statusRecorder = statusRecorder;
}
@@ -89,7 +85,6 @@ public class FlinkSessionJobController
observer.observe(flinkSessionJob, context);
if (!validateSessionJob(flinkSessionJob, context)) {
- metricManager.onUpdate(flinkSessionJob);
statusRecorder.patchAndCacheStatus(flinkSessionJob);
return ReconciliationUtils.toUpdateControl(
configManager.getOperatorConfiguration(), flinkSessionJob,
previousJob, false);
@@ -101,7 +96,6 @@ public class FlinkSessionJobController
} catch (Exception e) {
throw new ReconciliationException(e);
}
- metricManager.onUpdate(flinkSessionJob);
statusRecorder.patchAndCacheStatus(flinkSessionJob);
return ReconciliationUtils.toUpdateControl(
configManager.getOperatorConfiguration(), flinkSessionJob,
previousJob, true);
@@ -110,7 +104,6 @@ public class FlinkSessionJobController
@Override
public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) {
LOG.info("Deleting FlinkSessionJob");
- metricManager.onRemove(sessionJob);
statusRecorder.removeCachedStatus(sessionJob);
return reconciler.cleanup(sessionJob, context);
}
@@ -119,7 +112,7 @@ public class FlinkSessionJobController
public ErrorStatusUpdateControl<FlinkSessionJob> updateErrorStatus(
FlinkSessionJob sessionJob, Context<FlinkSessionJob> context,
Exception e) {
return ReconciliationUtils.toErrorStatusUpdateControl(
- sessionJob, context.getRetryInfo(), e, metricManager,
statusRecorder);
+ sessionJob, context.getRetryInfo(), e, statusRecorder);
}
@Override
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
index 6dbfb3b..8cd9c3d 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
@@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
/** Metric manager for Operator managed custom resources. */
public class MetricManager<CR extends CustomResource<?, ?>> {
- private static final String NS_SCOPE_KEY = "resourcens";
+ public static final String NS_SCOPE_KEY = "resourcens";
private final MetricGroup metricGroup;
private final Map<String, CustomResourceMetrics> metrics = new
ConcurrentHashMap<>();
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 073d088..79ee2da 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -32,7 +32,6 @@ import
org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.util.Preconditions;
@@ -390,7 +389,6 @@ public class ReconciliationUtils {
* @param resource Flink Resource to be updated
* @param retryInfo Current RetryInformation
* @param e Exception that caused the retry
- * @param metricManager Metric manager to be updated
* @param statusRecorder statusRecorder object for patching status
* @param <STATUS> Status type.
* @param <R> Resource type.
@@ -401,7 +399,6 @@ public class ReconciliationUtils {
R resource,
Optional<RetryInfo> retryInfo,
Exception e,
- MetricManager<R> metricManager,
StatusRecorder<STATUS> statusRecorder) {
retryInfo.ifPresent(
@@ -416,7 +413,6 @@ public class ReconciliationUtils {
ReconciliationUtils.updateForReconciliationError(
resource,
(e instanceof ReconciliationException) ?
e.getCause().toString() : e.toString());
- metricManager.onUpdate(resource);
statusRecorder.patchAndCacheStatus(resource);
// Status was updated already, no need to return anything
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index 057d46a..4b6fbdb 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -25,11 +25,11 @@ import
org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.fabric8.kubernetes.api.model.HasMetadata;
-import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.SneakyThrows;
import org.slf4j.Logger;
@@ -50,13 +50,16 @@ public class StatusRecorder<STATUS extends CommonStatus<?>>
{
new ConcurrentHashMap<>();
private final KubernetesClient client;
+ private final MetricManager<AbstractFlinkResource<?, STATUS>>
metricManager;
private final BiConsumer<AbstractFlinkResource<?, STATUS>, STATUS>
statusUpdateListener;
public StatusRecorder(
KubernetesClient client,
+ MetricManager<AbstractFlinkResource<?, STATUS>> metricManager,
BiConsumer<AbstractFlinkResource<?, STATUS>, STATUS>
statusUpdateListener) {
this.client = client;
this.statusUpdateListener = statusUpdateListener;
+ this.metricManager = metricManager;
}
/**
@@ -103,6 +106,7 @@ public class StatusRecorder<STATUS extends CommonStatus<?>>
{
.withName(name)
.patchStatus(resource);
statusUpdateListener.accept(resource, prevStatus);
+ metricManager.onUpdate(resource);
return;
} catch (Exception e) {
LOG.error("Error while patching status, retrying {}/3...", (i
+ 1), e);
@@ -124,7 +128,7 @@ public class StatusRecorder<STATUS extends CommonStatus<?>>
{
* @param resource Resource for which the status should be updated from
the cache
* @param <T> Custom resource type.
*/
- public <T extends CustomResource<?, STATUS>> void updateStatusFromCache(T
resource) {
+ public <T extends AbstractFlinkResource<?, STATUS>> void
updateStatusFromCache(T resource) {
var key = getKey(resource);
var cachedStatus = statusCache.get(key);
if (cachedStatus != null) {
@@ -136,6 +140,7 @@ public class StatusRecorder<STATUS extends CommonStatus<?>>
{
// Initialize cache with current status copy
statusCache.put(key,
objectMapper.convertValue(resource.getStatus(), ObjectNode.class));
}
+ metricManager.onUpdate(resource);
}
/**
@@ -144,8 +149,9 @@ public class StatusRecorder<STATUS extends CommonStatus<?>>
{
* @param resource Flink resource.
* @param <T> Resource type.
*/
- public <T extends CustomResource<?, STATUS>> void removeCachedStatus(T
resource) {
+ public <T extends AbstractFlinkResource<?, STATUS>> void
removeCachedStatus(T resource) {
statusCache.remove(getKey(resource));
+ metricManager.onRemove(resource);
}
protected static Tuple2<String, String> getKey(HasMetadata resource) {
@@ -153,7 +159,9 @@ public class StatusRecorder<STATUS extends CommonStatus<?>>
{
}
public static <S extends CommonStatus<?>> StatusRecorder<S> create(
- KubernetesClient kubernetesClient,
Collection<FlinkResourceListener> listeners) {
+ KubernetesClient kubernetesClient,
+ MetricManager<AbstractFlinkResource<?, S>> metricManager,
+ Collection<FlinkResourceListener> listeners) {
BiConsumer<AbstractFlinkResource<?, S>, S> consumer =
(resource, previousStatus) -> {
var ctx =
@@ -183,6 +191,6 @@ public class StatusRecorder<STATUS extends CommonStatus<?>>
{
}
});
};
- return new StatusRecorder<>(kubernetesClient, consumer);
+ return new StatusRecorder<>(kubernetesClient, metricManager, consumer);
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
index 0243fd5..8306761 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
@@ -20,7 +20,9 @@ package org.apache.flink.kubernetes.operator;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.metrics.testutils.MetricListener;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -28,7 +30,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
public class TestingStatusRecorder<STATUS extends CommonStatus<?>> extends
StatusRecorder<STATUS> {
public TestingStatusRecorder() {
- super(null, (r, s) -> {});
+ super(null, new MetricManager<>(new
MetricListener().getMetricGroup()), (r, s) -> {});
}
@Override
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index d3909b4..f78b769 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -64,8 +64,11 @@ public class TestingFlinkDeploymentController
KubernetesClient kubernetesClient,
TestingFlinkService flinkService) {
eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
- statusRecorder = new StatusRecorder<>(kubernetesClient,
statusUpdateCounter);
-
+ statusRecorder =
+ new StatusRecorder<>(
+ kubernetesClient,
+ new MetricManager<>(new
MetricListener().getMetricGroup()),
+ statusUpdateCounter);
flinkDeploymentController =
new FlinkDeploymentController(
configManager,
@@ -78,7 +81,6 @@ public class TestingFlinkDeploymentController
statusRecorder),
new ObserverFactory(
flinkService, configManager, statusRecorder,
eventRecorder),
- new MetricManager<>(new
MetricListener().getMetricGroup()),
statusRecorder,
eventRecorder);
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
index 86fe742..10a25da 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
@@ -21,8 +21,10 @@ import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.metrics.testutils.MetricListener;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -52,7 +54,10 @@ public class FlinkResourceListenerTest {
var listeners = List.<FlinkResourceListener>of(listener1, listener2);
var statusRecorder =
- StatusRecorder.<FlinkDeploymentStatus>create(kubernetesClient,
listeners);
+ StatusRecorder.<FlinkDeploymentStatus>create(
+ kubernetesClient,
+ new MetricManager<>(new
MetricListener().getMetricGroup()),
+ listeners);
var eventRecorder = EventRecorder.create(kubernetesClient, listeners);
var deployment = TestUtils.buildApplicationCluster();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 439d848..b712215 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -39,7 +40,6 @@ import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
@@ -77,8 +77,7 @@ public class ApplicationReconcilerTest {
public void before() {
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
- var statusRecoder =
- new StatusRecorder<FlinkDeploymentStatus>(kubernetesClient,
(r, e) -> {});
+ var statusRecoder = new TestingStatusRecorder<FlinkDeploymentStatus>();
reconciler =
new ApplicationReconciler(
kubernetesClient,
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index 903873e..2e13d28 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -20,10 +20,10 @@ package
org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -63,7 +63,7 @@ public class SessionReconcilerTest {
flinkService,
configManager,
eventRecorder,
- new StatusRecorder<>(kubernetesClient, (r, e) -> {}));
+ new TestingStatusRecorder<>());
FlinkDeployment deployment = TestUtils.buildSessionCluster();
kubernetesClient.resource(deployment).createOrReplace();
reconciler.reconcile(deployment, context);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 8846544..a28e2f1 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
@@ -84,7 +85,7 @@ public class SessionJobReconcilerTest {
new SessionJobReconciler(
null, flinkService, configManager, eventRecorder,
statusRecoder);
kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
- statusRecoder = new
StatusRecorder<FlinkSessionJobStatus>(kubernetesClient, (r, e) -> {});
+ statusRecoder = new TestingStatusRecorder<>();
reconciler =
new SessionJobReconciler(
null, flinkService, configManager, eventRecorder,
statusRecoder);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
index 6724005..e3cf2eb 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
@@ -18,14 +18,21 @@
package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
+import org.apache.flink.metrics.testutils.MetricListener;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.junit.jupiter.api.Test;
+import static
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.METRIC_GROUP_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.MetricManager.NS_SCOPE_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Test for {@link StatusRecorder}. */
@@ -37,7 +44,11 @@ public class StatusRecorderTest {
@Test
public void testPatchOnlyWhenChanged() throws InterruptedException {
- var helper = new
StatusRecorder<FlinkDeploymentStatus>(kubernetesClient, (e, s) -> {});
+ var helper =
+ new StatusRecorder<FlinkDeploymentStatus>(
+ kubernetesClient,
+ new MetricManager<>(new
MetricListener().getMetricGroup()),
+ (e, s) -> {});
var deployment = TestUtils.buildApplicationCluster();
kubernetesClient.resource(deployment).createOrReplace();
var lastRequest = mockServer.getLastRequest();
@@ -56,4 +67,52 @@ public class StatusRecorderTest {
helper.patchAndCacheStatus(deployment);
assertTrue(mockServer.getLastRequest() == lastRequest);
}
+
+ @Test
+ public void testFlinkDeploymentMetrics() throws InterruptedException {
+ var metricListener = new MetricListener();
+ var helper =
+ new StatusRecorder<FlinkDeploymentStatus>(
+ kubernetesClient,
+ new MetricManager<>(metricListener.getMetricGroup()),
+ (e, s) -> {});
+
+ var deployment = TestUtils.buildApplicationCluster();
+ kubernetesClient.resource(deployment).createOrReplace();
+
+ helper.updateStatusFromCache(deployment);
+ assertEquals(1,
metricListener.getGauge(totalIdentifier(deployment)).get().getValue());
+ assertEquals(1,
metricListener.getGauge(perStatusIdentifier(deployment)).get().getValue());
+
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ deployment.getStatus().setJobManagerDeploymentStatus(status);
+ helper.patchAndCacheStatus(deployment);
+ assertEquals(1,
metricListener.getGauge(totalIdentifier(deployment)).get().getValue());
+ assertEquals(
+ 1,
metricListener.getGauge(perStatusIdentifier(deployment)).get().getValue());
+ }
+
+ helper.removeCachedStatus(deployment);
+ assertEquals(0,
metricListener.getGauge(totalIdentifier(deployment)).get().getValue());
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ assertEquals(
+ 0,
metricListener.getGauge(perStatusIdentifier(deployment)).get().getValue());
+ }
+ }
+
+ private String[] totalIdentifier(FlinkDeployment deployment) {
+ return new String[] {
+ NS_SCOPE_KEY, deployment.getMetadata().getNamespace(),
METRIC_GROUP_NAME, "Count"
+ };
+ }
+
+ private String[] perStatusIdentifier(FlinkDeployment deployment) {
+ return new String[] {
+ NS_SCOPE_KEY,
+ deployment.getMetadata().getNamespace(),
+ METRIC_GROUP_NAME,
+ deployment.getStatus().getJobManagerDeploymentStatus().name(),
+ "Count"
+ };
+ }
}