This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 4d3b08f0 [FLINK-28592] Custom resource counter metrics improvements
4d3b08f0 is described below
commit 4d3b08f0a890c84e2a5131e08efd8925bc41ddad
Author: Matyas Orhidi <[email protected]>
AuthorDate: Sat Jul 23 18:20:57 2022 +0200
[FLINK-28592] Custom resource counter metrics improvements
---
.../kubernetes_operator_metric_configuration.html | 6 +
.../flink/kubernetes/operator/FlinkOperator.java | 14 +-
.../controller/FlinkDeploymentController.java | 4 +-
.../controller/FlinkSessionJobController.java | 4 +-
.../operator/metrics/CustomResourceMetrics.java | 2 +-
.../operator/metrics/FlinkDeploymentMetrics.java | 82 ++++++---
.../operator/metrics/FlinkSessionJobMetrics.java | 41 ++++-
.../metrics/KubernetesOperatorMetricOptions.java | 7 +
.../kubernetes/operator/metrics/MetricManager.java | 100 ++++++-----
.../metrics/lifecycle/LifecycleMetrics.java | 12 +-
.../operator/observer/SavepointObserver.java | 7 +-
.../observer/deployment/ApplicationObserver.java | 4 +-
.../observer/deployment/ObserverFactory.java | 4 +-
.../observer/sessionjob/SessionJobObserver.java | 4 +-
.../operator/reconciler/ReconciliationUtils.java | 2 +-
.../AbstractFlinkResourceReconciler.java | 4 +-
.../deployment/AbstractJobReconciler.java | 2 +-
.../deployment/ApplicationReconciler.java | 2 +-
.../reconciler/deployment/ReconcilerFactory.java | 4 +-
.../reconciler/deployment/SessionReconciler.java | 2 +-
.../sessionjob/SessionJobReconciler.java | 2 +-
.../kubernetes/operator/utils/StatusRecorder.java | 34 ++--
.../flink/kubernetes/operator/TestUtils.java | 66 ++++---
.../kubernetes/operator/TestingStatusRecorder.java | 10 +-
.../TestingFlinkDeploymentController.java | 7 +-
.../listener/FlinkResourceListenerTest.java | 9 +-
.../metrics/FlinkDeploymentMetricsTest.java | 193 ++++++++++++++-------
.../metrics/FlinkSessionJobMetricsTest.java | 111 ++++++++++--
.../operator/metrics/TestingMetricListener.java | 91 ++++++++++
.../lifecycle/ResourceLifecycleMetricsTest.java | 43 +++--
.../sessionjob/SessionJobObserverTest.java | 3 +-
.../deployment/ApplicationReconcilerTest.java | 2 +-
.../sessionjob/SessionJobReconcilerTest.java | 2 +-
.../operator/utils/StatusRecorderTest.java | 9 +-
34 files changed, 617 insertions(+), 272 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
index d5d23294..74a02191 100644
---
a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
+++
b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
@@ -44,6 +44,12 @@
<td>Boolean</td>
<td>In addition to the system level histograms, enable per
namespace tracking of state and transition times.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.resource.metrics.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Enables metrics for FlinkDeployment and FlinkSessionJob custom
resources.</td>
+ </tr>
<tr>
<td><h5>metrics.scope.k8soperator.resource</h5></td>
<td style="word-wrap:
break-word;">"<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>"</td>
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 4d2f4674..c0e8841d 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -25,8 +25,6 @@ import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import
org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
import org.apache.flink.kubernetes.operator.listener.ListenerUtils;
import
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
@@ -120,9 +118,9 @@ public class FlinkOperator {
@VisibleForTesting
void registerDeploymentController() {
- var statusRecorder =
- StatusRecorder.<FlinkDeploymentStatus>create(
- client, new MetricManager<>(metricGroup,
configManager), listeners);
+ var metricManager =
+
MetricManager.createFlinkDeploymentMetricManager(configManager, metricGroup);
+ var statusRecorder = StatusRecorder.create(client, metricManager,
listeners);
var eventRecorder = EventRecorder.create(client, listeners);
var reconcilerFactory =
new ReconcilerFactory(
@@ -145,9 +143,9 @@ public class FlinkOperator {
@VisibleForTesting
void registerSessionJobController() {
var eventRecorder = EventRecorder.create(client, listeners);
- var statusRecorder =
- StatusRecorder.<FlinkSessionJobStatus>create(
- client, new MetricManager<>(metricGroup,
configManager), listeners);
+ var metricManager =
+
MetricManager.createFlinkSessionJobMetricManager(configManager, metricGroup);
+ var statusRecorder = StatusRecorder.create(client, metricManager,
listeners);
var reconciler =
new SessionJobReconciler(
client, flinkServiceFactory, configManager,
eventRecorder, statusRecorder);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 1bd59468..dae1f64f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -64,7 +64,7 @@ public class FlinkDeploymentController
private final Set<FlinkResourceValidator> validators;
private final ReconcilerFactory reconcilerFactory;
private final ObserverFactory observerFactory;
- private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
+ private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>
statusRecorder;
private final EventRecorder eventRecorder;
public FlinkDeploymentController(
@@ -72,7 +72,7 @@ public class FlinkDeploymentController
Set<FlinkResourceValidator> validators,
ReconcilerFactory reconcilerFactory,
ObserverFactory observerFactory,
- StatusRecorder<FlinkDeploymentStatus> statusRecorder,
+ StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>
statusRecorder,
EventRecorder eventRecorder) {
this.configManager = configManager;
this.validators = validators;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index 3727af40..aa4c1450 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -60,14 +60,14 @@ public class FlinkSessionJobController
private final Set<FlinkResourceValidator> validators;
private final Reconciler<FlinkSessionJob> reconciler;
private final Observer<FlinkSessionJob> observer;
- private final StatusRecorder<FlinkSessionJobStatus> statusRecorder;
+ private final StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus>
statusRecorder;
public FlinkSessionJobController(
FlinkConfigManager configManager,
Set<FlinkResourceValidator> validators,
Reconciler<FlinkSessionJob> reconciler,
Observer<FlinkSessionJob> observer,
- StatusRecorder<FlinkSessionJobStatus> statusRecorder) {
+ StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus>
statusRecorder) {
this.configManager = configManager;
this.validators = validators;
this.reconciler = reconciler;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/CustomResourceMetrics.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/CustomResourceMetrics.java
index 512db41e..08a7704e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/CustomResourceMetrics.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/CustomResourceMetrics.java
@@ -20,7 +20,7 @@ package org.apache.flink.kubernetes.operator.metrics;
import io.fabric8.kubernetes.client.CustomResource;
/** Custom resource metric type. */
-public interface CustomResourceMetrics<CR extends CustomResource> {
+public interface CustomResourceMetrics<CR extends CustomResource<?, ?>> {
void onUpdate(CR customResource);
void onRemove(CR customResource);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
index 8e40175b..0c8f3826 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
@@ -17,12 +17,10 @@
package org.apache.flink.kubernetes.operator.metrics;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.metrics.MetricGroup;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,40 +28,68 @@ import java.util.concurrent.ConcurrentHashMap;
/** FlinkDeployment metrics. */
public class FlinkDeploymentMetrics implements
CustomResourceMetrics<FlinkDeployment> {
- private final Map<JobManagerDeploymentStatus, Set<String>> statuses = new
HashMap<>();
- public static final String FLINK_DEPLOYMENT_GROUP_NAME = "FlinkDeployment";
- public static final String JM_DEPLOYMENT_STATUS_GROUP_NAME =
"JmDeploymentStatus";
+ private final KubernetesOperatorMetricGroup parentMetricGroup;
+ private final Configuration configuration;
+ private final Map<String, Map<JobManagerDeploymentStatus, Set<String>>>
deployments =
+ new ConcurrentHashMap<>();
+ public static final String DEPLOYMENT_GROUP_NAME =
FlinkDeployment.class.getSimpleName();
+ public static final String STATUS_GROUP_NAME = "JmDeploymentStatus";
public static final String COUNTER_NAME = "Count";
- public FlinkDeploymentMetrics(MetricGroup parentMetricGroup) {
- MetricGroup flinkDeploymentMetrics =
- parentMetricGroup.addGroup(FLINK_DEPLOYMENT_GROUP_NAME);
- for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
- statuses.put(status, ConcurrentHashMap.newKeySet());
- }
- for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
- statuses.put(status, new HashSet<>());
- MetricGroup metricGroup =
- flinkDeploymentMetrics
- .addGroup(JM_DEPLOYMENT_STATUS_GROUP_NAME)
- .addGroup(status.toString());
- metricGroup.gauge(COUNTER_NAME, () -> statuses.get(status).size());
- }
- flinkDeploymentMetrics.gauge(
- COUNTER_NAME, () ->
statuses.values().stream().mapToInt(Set::size).sum());
+ public FlinkDeploymentMetrics(
+ KubernetesOperatorMetricGroup parentMetricGroup, Configuration
configuration) {
+ this.parentMetricGroup = parentMetricGroup;
+ this.configuration = configuration;
}
public void onUpdate(FlinkDeployment flinkApp) {
onRemove(flinkApp);
- statuses.get(flinkApp.getStatus().getJobManagerDeploymentStatus())
+ deployments
+ .computeIfAbsent(
+ flinkApp.getMetadata().getNamespace(),
+ ns -> {
+ initNamespaceDeploymentCounts(ns);
+ initNamespaceStatusCounts(ns);
+ return createDeploymentStatusMap();
+ })
+ .get(flinkApp.getStatus().getJobManagerDeploymentStatus())
.add(flinkApp.getMetadata().getName());
}
public void onRemove(FlinkDeployment flinkApp) {
- statuses.values()
- .forEach(
- deployments -> {
-
deployments.remove(flinkApp.getMetadata().getName());
- });
+ if (!deployments.containsKey(flinkApp.getMetadata().getNamespace())) {
+ return;
+ }
+ deployments
+ .get(flinkApp.getMetadata().getNamespace())
+ .values()
+ .forEach(names ->
names.remove(flinkApp.getMetadata().getName()));
+ }
+
+ private void initNamespaceDeploymentCounts(String ns) {
+ parentMetricGroup
+ .createResourceNamespaceGroup(configuration, ns)
+ .addGroup(DEPLOYMENT_GROUP_NAME)
+ .gauge(
+ COUNTER_NAME,
+ () ->
deployments.get(ns).values().stream().mapToInt(Set::size).sum());
+ }
+
+ private void initNamespaceStatusCounts(String ns) {
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ parentMetricGroup
+ .createResourceNamespaceGroup(configuration, ns)
+ .addGroup(STATUS_GROUP_NAME)
+ .addGroup(status.toString())
+ .gauge(COUNTER_NAME, () ->
deployments.get(ns).get(status).size());
+ }
+ }
+
+ private Map<JobManagerDeploymentStatus, Set<String>>
createDeploymentStatusMap() {
+ Map<JobManagerDeploymentStatus, Set<String>> statuses = new
ConcurrentHashMap<>();
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ statuses.put(status, ConcurrentHashMap.newKeySet());
+ }
+ return statuses;
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
index 492ed0d2..1d099519 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
@@ -17,28 +17,53 @@
package org.apache.flink.kubernetes.operator.metrics;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
-import org.apache.flink.metrics.MetricGroup;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/** FlinkSessionJob metrics. */
public class FlinkSessionJobMetrics implements
CustomResourceMetrics<FlinkSessionJob> {
- private final Set<String> sessionJobs = ConcurrentHashMap.newKeySet();
- public static final String METRIC_GROUP_NAME = "FlinkSessionJob";
+ private final KubernetesOperatorMetricGroup parentMetricGroup;
+ private final Configuration configuration;
+ private final Map<String, Set<String>> sessionJobs = new
ConcurrentHashMap<>();
+ public static final String METRIC_GROUP_NAME =
FlinkSessionJob.class.getSimpleName();
+ public static final String COUNTER_NAME = "Count";
- public FlinkSessionJobMetrics(MetricGroup parentMetricGroup) {
- MetricGroup flinkSessionJobMetrics =
parentMetricGroup.addGroup(METRIC_GROUP_NAME);
- flinkSessionJobMetrics.gauge("Count", () -> sessionJobs.size());
+ public FlinkSessionJobMetrics(
+ KubernetesOperatorMetricGroup parentMetricGroup, Configuration
configuration) {
+ this.parentMetricGroup = parentMetricGroup;
+ this.configuration = configuration;
}
public void onUpdate(FlinkSessionJob sessionJob) {
- sessionJobs.add(sessionJob.getMetadata().getName());
+ onRemove(sessionJob);
+ sessionJobs
+ .computeIfAbsent(
+ sessionJob.getMetadata().getNamespace(),
+ ns -> {
+ initNamespaceSessionJobCounts(ns);
+ return ConcurrentHashMap.newKeySet();
+ })
+ .add(sessionJob.getMetadata().getName());
}
public void onRemove(FlinkSessionJob sessionJob) {
- sessionJobs.remove(sessionJob.getMetadata().getName());
+ if (!sessionJobs.containsKey(sessionJob.getMetadata().getNamespace()))
{
+ return;
+ }
+ sessionJobs
+ .get(sessionJob.getMetadata().getNamespace())
+ .remove(sessionJob.getMetadata().getName());
+ }
+
+ private void initNamespaceSessionJobCounts(String ns) {
+ parentMetricGroup
+ .createResourceNamespaceGroup(configuration, ns)
+ .addGroup(METRIC_GROUP_NAME)
+ .gauge(COUNTER_NAME, () -> sessionJobs.get(ns).size());
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index faba69d0..ae014b9f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -43,6 +43,13 @@ public class KubernetesOperatorMetricOptions {
.withDescription(
"Enable KubernetesClient metrics for measuring the
HTTP traffic to the Kubernetes API Server.");
+ public static final ConfigOption<Boolean>
OPERATOR_RESOURCE_METRICS_ENABLED =
+ ConfigOptions.key("kubernetes.operator.resource.metrics.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Enables metrics for FlinkDeployment and
FlinkSessionJob custom resources.");
+
public static final ConfigOption<Boolean>
OPERATOR_LIFECYCLE_METRICS_ENABLED =
ConfigOptions.key("kubernetes.operator.resource.lifecycle.metrics.enabled")
.booleanType()
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
index 307019a7..490dd03e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
@@ -24,67 +24,81 @@ import
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.metrics.lifecycle.LifecycleMetrics;
-import java.time.Clock;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.ArrayList;
+import java.util.List;
/** Metric manager for Operator managed custom resources. */
public class MetricManager<CR extends AbstractFlinkResource<?, ?>> {
- private final KubernetesOperatorMetricGroup opMetricGroup;
- private final FlinkConfigManager configManager;
- private final Map<String, CustomResourceMetrics> metrics = new
ConcurrentHashMap<>();
+ private final List<CustomResourceMetrics<CR>> registeredMetrics = new
ArrayList<>();
- private final LifecycleMetrics<CR> lifeCycleMetrics;
+ public void onUpdate(CR cr) {
+ registeredMetrics.forEach(m -> m.onUpdate(cr));
+ }
- public MetricManager(
- KubernetesOperatorMetricGroup opMetricGroup, FlinkConfigManager
configManager) {
- this.opMetricGroup = opMetricGroup;
- this.configManager = configManager;
+ public void onRemove(CR cr) {
+ registeredMetrics.forEach(m -> m.onRemove(cr));
+ }
- if (configManager
- .getDefaultConfig()
-
.get(KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED)) {
- this.lifeCycleMetrics =
- new LifecycleMetrics<>(configManager,
Clock.systemDefaultZone(), opMetricGroup);
- } else {
- this.lifeCycleMetrics = null;
- }
+ public void register(CustomResourceMetrics<CR> metrics) {
+ registeredMetrics.add(metrics);
}
- public void onUpdate(CR cr) {
- getCustomResourceMetrics(cr).onUpdate(cr);
- if (lifeCycleMetrics != null) {
- lifeCycleMetrics.onUpdate(cr);
- }
+ public static MetricManager<FlinkDeployment>
createFlinkDeploymentMetricManager(
+ FlinkConfigManager configManager, KubernetesOperatorMetricGroup
metricGroup) {
+ MetricManager<FlinkDeployment> metricManager = new MetricManager<>();
+ registerFlinkDeploymentMetrics(configManager, metricGroup,
metricManager);
+ registerLifecycleMetrics(configManager, metricGroup, metricManager);
+ return metricManager;
}
- public void onRemove(CR cr) {
- getCustomResourceMetrics(cr).onRemove(cr);
- if (lifeCycleMetrics != null) {
- lifeCycleMetrics.onRemove(cr);
+ public static MetricManager<FlinkSessionJob>
createFlinkSessionJobMetricManager(
+ FlinkConfigManager configManager, KubernetesOperatorMetricGroup
metricGroup) {
+ MetricManager<FlinkSessionJob> metricManager = new MetricManager<>();
+ registerFlinkSessionJobMetrics(configManager, metricGroup,
metricManager);
+ registerLifecycleMetrics(configManager, metricGroup, metricManager);
+ return metricManager;
+ }
+
+ private static void registerFlinkDeploymentMetrics(
+ FlinkConfigManager configManager,
+ KubernetesOperatorMetricGroup metricGroup,
+ MetricManager<FlinkDeployment> metricManager) {
+ if (configManager
+ .getDefaultConfig()
+
.get(KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED)) {
+ metricManager.register(
+ new FlinkDeploymentMetrics(metricGroup,
configManager.getDefaultConfig()));
}
}
- private CustomResourceMetrics getCustomResourceMetrics(CR cr) {
- return metrics.computeIfAbsent(
- cr.getMetadata().getNamespace(), k ->
getCustomResourceMetricsImpl(cr));
+ private static void registerFlinkSessionJobMetrics(
+ FlinkConfigManager configManager,
+ KubernetesOperatorMetricGroup metricGroup,
+ MetricManager<FlinkSessionJob> metricManager) {
+ if (configManager
+ .getDefaultConfig()
+
.get(KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED)) {
+ metricManager.register(
+ new FlinkSessionJobMetrics(metricGroup,
configManager.getDefaultConfig()));
+ }
}
- private CustomResourceMetrics getCustomResourceMetricsImpl(CR cr) {
- var namespaceMg =
- opMetricGroup.createResourceNamespaceGroup(
- configManager.getDefaultConfig(),
cr.getMetadata().getNamespace());
- if (cr instanceof FlinkDeployment) {
- return new FlinkDeploymentMetrics(namespaceMg);
- } else if (cr instanceof FlinkSessionJob) {
- return new FlinkSessionJobMetrics(namespaceMg);
- } else {
- throw new IllegalArgumentException("Unknown CustomResource");
+ private static <CR extends AbstractFlinkResource<?, ?>> void
registerLifecycleMetrics(
+ FlinkConfigManager configManager,
+ KubernetesOperatorMetricGroup metricGroup,
+ MetricManager<CR> metricManager) {
+ if (configManager
+ .getDefaultConfig()
+
.get(KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED)
+ && configManager
+ .getDefaultConfig()
+
.get(KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED)) {
+ metricManager.register(new LifecycleMetrics<>(configManager,
metricGroup));
}
}
@VisibleForTesting
- public LifecycleMetrics<CR> getLifeCycleMetrics() {
- return lifeCycleMetrics;
+ public List<CustomResourceMetrics<CR>> getRegisteredMetrics() {
+ return registeredMetrics;
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
index c2ef91eb..ad76b02a 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
import
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
import
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
@@ -53,7 +54,8 @@ import static
org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLif
*
* @param <CR> Flink resource type.
*/
-public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>> {
+public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
+ implements CustomResourceMetrics<CR> {
private static final String TRANSITION_RESUME = "Resume";
private static final String TRANSITION_UPGRADE = "Upgrade";
@@ -79,11 +81,9 @@ public class LifecycleMetrics<CR extends
AbstractFlinkResource<?, ?>> {
private Function<MetricGroup, MetricGroup> metricGroupFunction;
public LifecycleMetrics(
- FlinkConfigManager configManager,
- Clock clock,
- KubernetesOperatorMetricGroup operatorMetricGroup) {
+ FlinkConfigManager configManager, KubernetesOperatorMetricGroup
operatorMetricGroup) {
this.configManager = configManager;
- this.clock = clock;
+ this.clock = Clock.systemDefaultZone();
this.operatorMetricGroup = operatorMetricGroup;
this.namespaceHistosEnabled =
configManager
@@ -93,10 +93,12 @@ public class LifecycleMetrics<CR extends
AbstractFlinkResource<?, ?>> {
.OPERATOR_LIFECYCLE_NAMESPACE_HISTOGRAMS_ENABLED);
}
+ @Override
public void onUpdate(CR cr) {
getLifecycleMetricTracker(cr).onUpdate(cr.getStatus().getLifecycleState(),
clock.instant());
}
+ @Override
public void onRemove(CR cr) {
lifecycleTrackers.remove(
Tuple2.of(cr.getMetadata().getNamespace(),
cr.getMetadata().getName()));
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
index aa3590a4..52c322ee 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
@@ -42,19 +42,20 @@ import java.util.Iterator;
import java.util.List;
/** An observer of savepoint progress. */
-public class SavepointObserver<STATUS extends CommonStatus<?>> {
+public class SavepointObserver<
+ CR extends AbstractFlinkResource<?, STATUS>, STATUS extends
CommonStatus<?>> {
private static final Logger LOG =
LoggerFactory.getLogger(SavepointObserver.class);
private final FlinkService flinkService;
private final FlinkConfigManager configManager;
- private final StatusRecorder<STATUS> statusRecorder;
+ private final StatusRecorder<CR, STATUS> statusRecorder;
private final EventRecorder eventRecorder;
public SavepointObserver(
FlinkService flinkService,
FlinkConfigManager configManager,
- StatusRecorder<STATUS> statusRecorder,
+ StatusRecorder<CR, STATUS> statusRecorder,
EventRecorder eventRecorder) {
this.flinkService = flinkService;
this.configManager = configManager;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index 01641699..038e0342 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -38,13 +38,13 @@ import java.util.Optional;
/** The observer of {@link
org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
public class ApplicationObserver extends AbstractDeploymentObserver {
- private final SavepointObserver<FlinkDeploymentStatus> savepointObserver;
+ private final SavepointObserver<FlinkDeployment, FlinkDeploymentStatus>
savepointObserver;
private final JobStatusObserver<ApplicationObserverContext>
jobStatusObserver;
public ApplicationObserver(
FlinkService flinkService,
FlinkConfigManager configManager,
- StatusRecorder<FlinkDeploymentStatus> statusRecorder,
+ StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>
statusRecorder,
EventRecorder eventRecorder) {
super(flinkService, configManager, eventRecorder);
this.savepointObserver =
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
index b41d34b8..1f1617b2 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
@@ -36,7 +36,7 @@ public class ObserverFactory {
private final FlinkServiceFactory flinkServiceFactory;
private final FlinkConfigManager configManager;
- private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
+ private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>
statusRecorder;
private final EventRecorder eventRecorder;
private final Map<Tuple2<Mode, KubernetesDeploymentMode>,
Observer<FlinkDeployment>>
observerMap;
@@ -44,7 +44,7 @@ public class ObserverFactory {
public ObserverFactory(
FlinkServiceFactory flinkServiceFactory,
FlinkConfigManager configManager,
- StatusRecorder<FlinkDeploymentStatus> statusRecorder,
+ StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>
statusRecorder,
EventRecorder eventRecorder) {
this.flinkServiceFactory = flinkServiceFactory;
this.configManager = configManager;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index c3aeec46..14f76492 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -56,12 +56,12 @@ public class SessionJobObserver implements
Observer<FlinkSessionJob> {
private final FlinkServiceFactory flinkServiceFactory;
private final FlinkConfigManager configManager;
private final EventRecorder eventRecorder;
- private final StatusRecorder<FlinkSessionJobStatus> statusRecorder;
+ private final StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus>
statusRecorder;
public SessionJobObserver(
FlinkServiceFactory flinkServiceFactory,
FlinkConfigManager configManager,
- StatusRecorder<FlinkSessionJobStatus> statusRecorder,
+ StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus>
statusRecorder,
EventRecorder eventRecorder) {
this.flinkServiceFactory = flinkServiceFactory;
this.configManager = configManager;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 79ee2dad..1e596fe5 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -399,7 +399,7 @@ public class ReconciliationUtils {
R resource,
Optional<RetryInfo> retryInfo,
Exception e,
- StatusRecorder<STATUS> statusRecorder) {
+ StatusRecorder<R, STATUS> statusRecorder) {
retryInfo.ifPresent(
r -> {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 3d458432..f2eb5efb 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -63,7 +63,7 @@ public abstract class AbstractFlinkResourceReconciler<
protected final FlinkConfigManager configManager;
protected final EventRecorder eventRecorder;
- protected final StatusRecorder<STATUS> statusRecorder;
+ protected final StatusRecorder<CR, STATUS> statusRecorder;
protected final KubernetesClient kubernetesClient;
public static final String MSG_SUSPENDED = "Suspending existing
deployment.";
@@ -75,7 +75,7 @@ public abstract class AbstractFlinkResourceReconciler<
KubernetesClient kubernetesClient,
FlinkConfigManager configManager,
EventRecorder eventRecorder,
- StatusRecorder<STATUS> statusRecorder) {
+ StatusRecorder<CR, STATUS> statusRecorder) {
this.kubernetesClient = kubernetesClient;
this.configManager = configManager;
this.eventRecorder = eventRecorder;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index e3a72a05..5d47ebbc 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -55,7 +55,7 @@ public abstract class AbstractJobReconciler<
KubernetesClient kubernetesClient,
FlinkConfigManager configManager,
EventRecorder eventRecorder,
- StatusRecorder<STATUS> statusRecorder) {
+ StatusRecorder<CR, STATUS> statusRecorder) {
super(kubernetesClient, configManager, eventRecorder, statusRecorder);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index ec0389e2..e7c5143a 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -60,7 +60,7 @@ public class ApplicationReconciler
FlinkService flinkService,
FlinkConfigManager configManager,
EventRecorder eventRecorder,
- StatusRecorder<FlinkDeploymentStatus> statusRecorder) {
+ StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>
statusRecorder) {
super(kubernetesClient, configManager, eventRecorder, statusRecorder);
this.flinkService = flinkService;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index 2d11f495..3192a954 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -40,7 +40,7 @@ public class ReconcilerFactory {
private final FlinkServiceFactory flinkServiceFactory;
private final FlinkConfigManager configManager;
private final EventRecorder eventRecorder;
- private final StatusRecorder<FlinkDeploymentStatus>
deploymentStatusRecorder;
+ private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>
deploymentStatusRecorder;
private final Map<Tuple2<Mode, KubernetesDeploymentMode>,
Reconciler<FlinkDeployment>>
reconcilerMap;
@@ -49,7 +49,7 @@ public class ReconcilerFactory {
FlinkServiceFactory flinkServiceFactory,
FlinkConfigManager configManager,
EventRecorder eventRecorder,
- StatusRecorder<FlinkDeploymentStatus> deploymentStatusRecorder) {
+ StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>
deploymentStatusRecorder) {
this.kubernetesClient = kubernetesClient;
this.flinkServiceFactory = flinkServiceFactory;
this.configManager = configManager;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 527cb8b5..9b2f48e1 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -60,7 +60,7 @@ public class SessionReconciler
FlinkService flinkService,
FlinkConfigManager configManager,
EventRecorder eventRecorder,
- StatusRecorder<FlinkDeploymentStatus> statusRecorder) {
+ StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>
statusRecorder) {
super(kubernetesClient, configManager, eventRecorder, statusRecorder);
this.flinkService = flinkService;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index f0dd2520..1ff4f76c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -53,7 +53,7 @@ public class SessionJobReconciler
FlinkServiceFactory flinkServiceFactory,
FlinkConfigManager configManager,
EventRecorder eventRecorder,
- StatusRecorder<FlinkSessionJobStatus> statusRecorder) {
+ StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus>
statusRecorder) {
super(kubernetesClient, configManager, eventRecorder, statusRecorder);
this.flinkServiceFactory = flinkServiceFactory;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index 4b6fbdb7..47e34806 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -40,7 +40,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
/** Helper class for status management and updates. */
-public class StatusRecorder<STATUS extends CommonStatus<?>> {
+public class StatusRecorder<
+ CR extends AbstractFlinkResource<?, STATUS>, STATUS extends
CommonStatus<?>> {
private static final Logger LOG =
LoggerFactory.getLogger(StatusRecorder.class);
@@ -50,13 +51,13 @@ public class StatusRecorder<STATUS extends CommonStatus<?>>
{
new ConcurrentHashMap<>();
private final KubernetesClient client;
- private final MetricManager<AbstractFlinkResource<?, STATUS>>
metricManager;
- private final BiConsumer<AbstractFlinkResource<?, STATUS>, STATUS>
statusUpdateListener;
+ private final MetricManager<CR> metricManager;
+ private final BiConsumer<CR, STATUS> statusUpdateListener;
public StatusRecorder(
KubernetesClient client,
- MetricManager<AbstractFlinkResource<?, STATUS>> metricManager,
- BiConsumer<AbstractFlinkResource<?, STATUS>, STATUS>
statusUpdateListener) {
+ MetricManager<CR> metricManager,
+ BiConsumer<CR, STATUS> statusUpdateListener) {
this.client = client;
this.statusUpdateListener = statusUpdateListener;
this.metricManager = metricManager;
@@ -69,11 +70,10 @@ public class StatusRecorder<STATUS extends CommonStatus<?>>
{
* operator behavior.
*
* @param resource Resource for which status update should be performed
- * @param <T> Resource type.
*/
@SneakyThrows
- public <T extends AbstractFlinkResource<?, STATUS>> void
patchAndCacheStatus(T resource) {
- Class<T> resourceClass = (Class<T>) resource.getClass();
+ public void patchAndCacheStatus(CR resource) {
+ Class<CR> resourceClass = (Class<CR>) resource.getClass();
String namespace = resource.getMetadata().getNamespace();
String name = resource.getMetadata().getName();
@@ -126,9 +126,8 @@ public class StatusRecorder<STATUS extends CommonStatus<?>>
{
* reconciles a resource for the first time after a restart.
*
* @param resource Resource for which the status should be updated from
the cache
- * @param <T> Custom resource type.
*/
- public <T extends AbstractFlinkResource<?, STATUS>> void
updateStatusFromCache(T resource) {
+ public void updateStatusFromCache(CR resource) {
var key = getKey(resource);
var cachedStatus = statusCache.get(key);
if (cachedStatus != null) {
@@ -147,9 +146,8 @@ public class StatusRecorder<STATUS extends CommonStatus<?>>
{
* Remove cached status for Flink resource.
*
* @param resource Flink resource.
- * @param <T> Resource type.
*/
- public <T extends AbstractFlinkResource<?, STATUS>> void
removeCachedStatus(T resource) {
+ public void removeCachedStatus(CR resource) {
statusCache.remove(getKey(resource));
metricManager.onRemove(resource);
}
@@ -158,11 +156,12 @@ public class StatusRecorder<STATUS extends
CommonStatus<?>> {
return Tuple2.of(resource.getMetadata().getNamespace(),
resource.getMetadata().getName());
}
- public static <S extends CommonStatus<?>> StatusRecorder<S> create(
- KubernetesClient kubernetesClient,
- MetricManager<AbstractFlinkResource<?, S>> metricManager,
- Collection<FlinkResourceListener> listeners) {
- BiConsumer<AbstractFlinkResource<?, S>, S> consumer =
+ public static <S extends CommonStatus<?>, CR extends
AbstractFlinkResource<?, S>>
+ StatusRecorder<CR, S> create(
+ KubernetesClient kubernetesClient,
+ MetricManager<CR> metricManager,
+ Collection<FlinkResourceListener> listeners) {
+ BiConsumer<CR, S> consumer =
(resource, previousStatus) -> {
var ctx =
new FlinkResourceListener.StatusUpdateContext() {
@@ -191,6 +190,7 @@ public class StatusRecorder<STATUS extends CommonStatus<?>>
{
}
});
};
+
return new StatusRecorder<>(kubernetesClient, metricManager, consumer);
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index ebe8abc3..9c9687b3 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -22,8 +22,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
@@ -40,7 +38,6 @@ import
org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
@@ -109,20 +106,38 @@ public class TestUtils {
}
public static FlinkDeployment buildSessionCluster(FlinkVersion version) {
+ return buildSessionCluster(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE,
version);
+ }
+
+ public static FlinkDeployment buildSessionCluster(
+ String name, String namespace, FlinkVersion version) {
FlinkDeployment deployment = new FlinkDeployment();
deployment.setStatus(new FlinkDeploymentStatus());
deployment.setMetadata(
new ObjectMetaBuilder()
- .withName(TEST_DEPLOYMENT_NAME)
- .withNamespace(TEST_NAMESPACE)
+ .withName(name)
+ .withNamespace(namespace)
.withCreationTimestamp(Instant.now().toString())
.build());
deployment.setSpec(getTestFlinkDeploymentSpec(version));
return deployment;
}
+ public static FlinkDeployment buildApplicationCluster() {
+ return buildApplicationCluster(FlinkVersion.v1_15);
+ }
+
+ public static FlinkDeployment buildApplicationCluster(String name, String
namespace) {
+ return buildApplicationCluster(name, namespace, FlinkVersion.v1_15);
+ }
+
public static FlinkDeployment buildApplicationCluster(FlinkVersion
version) {
- FlinkDeployment deployment = buildSessionCluster(version);
+ return buildApplicationCluster(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE,
version);
+ }
+
+ public static FlinkDeployment buildApplicationCluster(
+ String name, String namespace, FlinkVersion version) {
+ FlinkDeployment deployment = buildSessionCluster(name, namespace,
version);
deployment
.getSpec()
.setJob(
@@ -136,17 +151,13 @@ public class TestUtils {
return deployment;
}
- public static FlinkDeployment buildApplicationCluster() {
- return buildApplicationCluster(FlinkVersion.v1_15);
- }
-
- public static FlinkSessionJob buildSessionJob() {
+ public static FlinkSessionJob buildSessionJob(String name, String
namespace) {
FlinkSessionJob sessionJob = new FlinkSessionJob();
sessionJob.setStatus(new FlinkSessionJobStatus());
sessionJob.setMetadata(
new ObjectMetaBuilder()
- .withName(TEST_SESSION_JOB_NAME)
- .withNamespace(TEST_NAMESPACE)
+ .withName(name)
+ .withNamespace(namespace)
.withCreationTimestamp(Instant.now().toString())
.withUid(UUID.randomUUID().toString())
.withGeneration(1L)
@@ -165,6 +176,10 @@ public class TestUtils {
return sessionJob;
}
+ public static FlinkSessionJob buildSessionJob() {
+ return buildSessionJob(TEST_SESSION_JOB_NAME, TEST_NAMESPACE);
+ }
+
public static FlinkDeploymentSpec getTestFlinkDeploymentSpec(FlinkVersion
version) {
Map<String, String> conf = new HashMap<>();
conf.put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2");
@@ -431,21 +446,8 @@ public class TestUtils {
}
}
- public static <T extends AbstractFlinkResource<?, ?>> MetricManager<T>
createTestMetricManager(
- Configuration conf) {
- return createTestMetricManager(
- TestingMetricRegistry.builder()
- .setDelimiter(".".charAt(0))
- .setRegisterConsumer((metric, name, group) -> {})
- .build(),
- conf);
- }
-
- public static <T extends AbstractFlinkResource<?, ?>> MetricManager<T>
createTestMetricManager(
- MetricRegistry metricRegistry, Configuration conf) {
-
- var confManager = new FlinkConfigManager(conf);
- return new MetricManager<>(createTestMetricGroup(metricRegistry,
conf), confManager);
+ public static KubernetesOperatorMetricGroup
createTestMetricGroup(Configuration conf) {
+ return createTestMetricGroup(createTestMetricRegistry(), conf);
}
public static KubernetesOperatorMetricGroup createTestMetricGroup(
@@ -454,6 +456,14 @@ public class TestUtils {
metricRegistry, conf, TEST_NAMESPACE, "testopname",
"testhost");
}
+ public static TestingMetricRegistry createTestMetricRegistry() {
+
+ return TestingMetricRegistry.builder()
+ .setDelimiter(".".charAt(0))
+ .setRegisterConsumer((metric, name, group) -> {})
+ .build();
+ }
+
/** Testing ResponseProvider. */
public static class ValidatingResponseProvider<T> implements
ResponseProvider<Object> {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
index 5e9cfcca..be893b58 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
@@ -18,22 +18,24 @@
package org.apache.flink.kubernetes.operator;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import com.fasterxml.jackson.databind.node.ObjectNode;
/** Testing statusRecorder. */
-public class TestingStatusRecorder<STATUS extends CommonStatus<?>> extends
StatusRecorder<STATUS> {
+public class TestingStatusRecorder<
+ CR extends AbstractFlinkResource<?, STATUS>, STATUS extends
CommonStatus<?>>
+ extends StatusRecorder<CR, STATUS> {
public TestingStatusRecorder() {
- super(null, TestUtils.createTestMetricManager(new Configuration()),
(r, s) -> {});
+ super(null, new MetricManager<>(), (r, s) -> {});
}
@Override
- public <T extends AbstractFlinkResource<?, STATUS>> void
patchAndCacheStatus(T resource) {
+ public void patchAndCacheStatus(CR resource) {
statusCache.put(
getKey(resource),
objectMapper.convertValue(resource.getStatus(),
ObjectNode.class));
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 088d46bf..b2991479 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -17,13 +17,13 @@
package org.apache.flink.kubernetes.operator.controller;
-import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
@@ -73,10 +73,7 @@ public class TestingFlinkDeploymentController
eventRecorder = new EventRecorder(kubernetesClient, eventCollector);
statusRecorder =
- new StatusRecorder<>(
- kubernetesClient,
-
TestUtils.createTestMetricManager(configManager.getDefaultConfig()),
- statusUpdateCounter);
+ new StatusRecorder<>(kubernetesClient, new MetricManager<>(),
statusUpdateCounter);
flinkDeploymentController =
new FlinkDeploymentController(
configManager,
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
index e68c910f..c3a42c8c 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
@@ -17,11 +17,11 @@
package org.apache.flink.kubernetes.operator.listener;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -52,11 +52,8 @@ public class FlinkResourceListenerTest {
var listener2 = new TestingListener();
var listeners = List.<FlinkResourceListener>of(listener1, listener2);
- var statusRecorder =
- StatusRecorder.<FlinkDeploymentStatus>create(
- kubernetesClient,
- TestUtils.createTestMetricManager(new Configuration()),
- listeners);
+ StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder =
+ StatusRecorder.create(kubernetesClient, new MetricManager<>(),
listeners);
var eventRecorder = EventRecorder.create(kubernetesClient, listeners);
var deployment = TestUtils.buildApplicationCluster();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
index 70a5523e..3c368fef 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
@@ -19,94 +19,161 @@ package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.HashMap;
-
import static
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.COUNTER_NAME;
-import static
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.FLINK_DEPLOYMENT_GROUP_NAME;
-import static
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.JM_DEPLOYMENT_STATUS_GROUP_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.DEPLOYMENT_GROUP_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATUS_GROUP_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** @link FlinkDeploymentMetrics tests. */
-@EnableKubernetesMockClient(crud = true)
public class FlinkDeploymentMetricsTest {
- private KubernetesClient kubernetesClient;
+
+ private final Configuration configuration = new Configuration();
+ private TestingMetricListener listener;
+ private MetricManager<FlinkDeployment> metricManager;
+
+ @BeforeEach
+ public void init() {
+ listener = new TestingMetricListener(configuration);
+ metricManager =
+ MetricManager.createFlinkDeploymentMetricManager(
+ new FlinkConfigManager(configuration),
listener.getMetricGroup());
+ }
@Test
- public void testFlinkDeploymentMetrics() throws InterruptedException {
- var metrics = new HashMap<String, Metric>();
- TestingMetricRegistry registry =
- TestingMetricRegistry.builder()
- .setDelimiter(".".charAt(0))
- .setRegisterConsumer(
- (metric, name, group) -> {
-
metrics.put(group.getMetricIdentifier(name), metric);
- })
- .build();
+ public void testMetricsSameNamespace() {
+ var namespace = TestUtils.TEST_NAMESPACE;
+ var deployment1 = TestUtils.buildApplicationCluster("deployment1",
namespace);
+ var deployment2 = TestUtils.buildApplicationCluster("deployment2",
namespace);
+
+ var counterId =
+ listener.getNamespaceMetricId(namespace,
DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+ assertTrue(listener.getGauge(counterId).isEmpty());
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ var statusId =
+ listener.getNamespaceMetricId(
+ namespace, STATUS_GROUP_NAME, status.name(),
COUNTER_NAME);
+ assertTrue(listener.getGauge(statusId).isEmpty());
+ }
- var metricManager =
- (MetricManager) TestUtils.createTestMetricManager(registry,
new Configuration());
- var helper =
- new StatusRecorder<FlinkDeploymentStatus>(
- kubernetesClient, metricManager, (e, s) -> {});
+ metricManager.onUpdate(deployment1);
+ metricManager.onUpdate(deployment2);
+ assertEquals(2, listener.getGauge(counterId).get().getValue());
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ deployment1.getStatus().setJobManagerDeploymentStatus(status);
+ deployment2.getStatus().setJobManagerDeploymentStatus(status);
+ metricManager.onUpdate(deployment1);
+ metricManager.onUpdate(deployment2);
+
+ var statusId =
+ listener.getNamespaceMetricId(
+ namespace, STATUS_GROUP_NAME, status.name(),
COUNTER_NAME);
+ assertEquals(2, listener.getGauge(statusId).get().getValue());
+ }
- var deployment = TestUtils.buildApplicationCluster();
- kubernetesClient.resource(deployment).createOrReplace();
+ metricManager.onRemove(deployment1);
+ metricManager.onRemove(deployment2);
+ assertEquals(0, listener.getGauge(counterId).get().getValue());
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ var statusId =
+ listener.getNamespaceMetricId(
+ namespace, STATUS_GROUP_NAME, status.name(),
COUNTER_NAME);
+ assertEquals(0, listener.getGauge(statusId).get().getValue());
+ }
+ }
- helper.updateStatusFromCache(deployment);
- assertEquals(1, ((Gauge)
metrics.get(totalIdentifier(deployment))).getValue());
- assertEquals(1, ((Gauge)
metrics.get(perStatusIdentifier(deployment))).getValue());
+ @Test
+ public void testMetricsMultiNamespace() {
+ var namespace1 = "ns1";
+ var namespace2 = "ns2";
+ var deployment1 = TestUtils.buildApplicationCluster("deployment",
namespace1);
+ var deployment2 = TestUtils.buildApplicationCluster("deployment",
namespace2);
+
+ var counterId1 =
+ listener.getNamespaceMetricId(namespace1,
DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+ var counterId2 =
+ listener.getNamespaceMetricId(namespace2,
DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+ assertTrue(listener.getGauge(counterId1).isEmpty());
+ assertTrue(listener.getGauge(counterId2).isEmpty());
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ var statusId1 =
+ listener.getNamespaceMetricId(
+ namespace1, STATUS_GROUP_NAME, status.name(),
COUNTER_NAME);
+ var statusId2 =
+ listener.getNamespaceMetricId(
+ namespace2, STATUS_GROUP_NAME, status.name(),
COUNTER_NAME);
+ assertTrue(listener.getGauge(statusId1).isEmpty());
+ assertTrue(listener.getGauge(statusId2).isEmpty());
+ }
+ metricManager.onUpdate(deployment1);
+ metricManager.onUpdate(deployment2);
+ assertEquals(1, listener.getGauge(counterId1).get().getValue());
+ assertEquals(1, listener.getGauge(counterId2).get().getValue());
for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
- deployment.getStatus().setJobManagerDeploymentStatus(status);
- helper.patchAndCacheStatus(deployment);
- assertEquals(1, ((Gauge)
metrics.get(totalIdentifier(deployment))).getValue());
- assertEquals(1, ((Gauge)
metrics.get(perStatusIdentifier(deployment))).getValue());
+ deployment1.getStatus().setJobManagerDeploymentStatus(status);
+ deployment2.getStatus().setJobManagerDeploymentStatus(status);
+ metricManager.onUpdate(deployment1);
+ metricManager.onUpdate(deployment2);
+ var statusId1 =
+ listener.getNamespaceMetricId(
+ namespace1, STATUS_GROUP_NAME, status.name(),
COUNTER_NAME);
+ var statusId2 =
+ listener.getNamespaceMetricId(
+ namespace2, STATUS_GROUP_NAME, status.name(),
COUNTER_NAME);
+ assertEquals(1, listener.getGauge(statusId1).get().getValue());
+ assertEquals(1, listener.getGauge(statusId2).get().getValue());
}
- helper.removeCachedStatus(deployment);
- assertEquals(0, ((Gauge)
metrics.get(totalIdentifier(deployment))).getValue());
+ metricManager.onRemove(deployment1);
+ metricManager.onRemove(deployment2);
+
+ assertEquals(0, listener.getGauge(counterId1).get().getValue());
+ assertEquals(0, listener.getGauge(counterId2).get().getValue());
for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
- assertEquals(0, ((Gauge)
metrics.get(perStatusIdentifier(deployment))).getValue());
+ deployment1.getStatus().setJobManagerDeploymentStatus(status);
+ deployment2.getStatus().setJobManagerDeploymentStatus(status);
+ var statusId1 =
+ listener.getNamespaceMetricId(
+ namespace1, STATUS_GROUP_NAME, status.name(),
COUNTER_NAME);
+ var statusId2 =
+ listener.getNamespaceMetricId(
+ namespace2, STATUS_GROUP_NAME, status.name(),
COUNTER_NAME);
+ assertEquals(0, listener.getGauge(statusId1).get().getValue());
+ assertEquals(0, listener.getGauge(statusId2).get().getValue());
}
}
- private String totalIdentifier(FlinkDeployment deployment) {
- String baseScope =
"testhost.k8soperator.flink-operator-test.testopname.";
- String[] metricScope =
- new String[] {
- "namespace",
- deployment.getMetadata().getNamespace(),
- FLINK_DEPLOYMENT_GROUP_NAME,
- COUNTER_NAME
- };
- return baseScope + String.join(".", metricScope);
- }
+ @Test
+ public void testMetricsDisabled() {
- private String perStatusIdentifier(FlinkDeployment deployment) {
+ var conf = new Configuration();
+ conf.set(OPERATOR_RESOURCE_METRICS_ENABLED, false);
+ var listener = new TestingMetricListener(conf);
+ var metricManager =
+ MetricManager.createFlinkDeploymentMetricManager(
+ new FlinkConfigManager(conf),
listener.getMetricGroup());
- String baseScope =
"testhost.k8soperator.flink-operator-test.testopname.";
- String[] metricScope =
- new String[] {
- "namespace",
- deployment.getMetadata().getNamespace(),
- FLINK_DEPLOYMENT_GROUP_NAME,
- JM_DEPLOYMENT_STATUS_GROUP_NAME,
-
deployment.getStatus().getJobManagerDeploymentStatus().name(),
- COUNTER_NAME
- };
+ var namespace = TestUtils.TEST_NAMESPACE;
+ var deployment = TestUtils.buildApplicationCluster("deployment",
namespace);
- return baseScope + String.join(".", metricScope);
+ var counterId =
+ listener.getNamespaceMetricId(namespace,
DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+ metricManager.onUpdate(deployment);
+ assertTrue(listener.getGauge(counterId).isEmpty());
+ for (JobManagerDeploymentStatus status :
JobManagerDeploymentStatus.values()) {
+ var statusId =
+ listener.getNamespaceMetricId(
+ namespace, STATUS_GROUP_NAME, status.name(),
COUNTER_NAME);
+ assertTrue(listener.getGauge(statusId).isEmpty());
+ }
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
index 21f9a6ba..284d6f42 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
@@ -17,30 +17,117 @@
package org.apache.flink.kubernetes.operator.metrics;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
-import org.apache.flink.metrics.testutils.MetricListener;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import static
org.apache.flink.kubernetes.operator.metrics.FlinkSessionJobMetrics.COUNTER_NAME;
import static
org.apache.flink.kubernetes.operator.metrics.FlinkSessionJobMetrics.METRIC_GROUP_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** @link FlinkSessionJobMetrics tests. */
public class FlinkSessionJobMetricsTest {
+ private final Configuration configuration = new Configuration();
+ private TestingMetricListener listener;
+ private MetricManager<FlinkSessionJob> metricManager;
+
+ @BeforeEach
+ public void init() {
+ listener = new TestingMetricListener(configuration);
+ metricManager =
+ MetricManager.createFlinkSessionJobMetricManager(
+ new FlinkConfigManager(configuration),
listener.getMetricGroup());
+ }
+
+ @Test
+ public void testMetricsSameNamespace() {
+ var namespace = "test-ns";
+ var job1 = TestUtils.buildSessionJob("job1", namespace);
+ var job2 = TestUtils.buildSessionJob("job2", namespace);
+ var metricId = listener.getNamespaceMetricId(namespace,
METRIC_GROUP_NAME, COUNTER_NAME);
+ assertTrue(listener.getGauge(namespace).isEmpty());
+
+ metricManager.onUpdate(job1);
+ metricManager.onUpdate(job2);
+ assertEquals(2, listener.getGauge(metricId).get().getValue());
+
+ metricManager.onUpdate(job1);
+ metricManager.onUpdate(job2);
+ assertEquals(2, listener.getGauge(metricId).get().getValue());
+
+ metricManager.onRemove(job1);
+ metricManager.onRemove(job2);
+ assertEquals(0, listener.getGauge(metricId).get().getValue());
+
+ metricManager.onRemove(job1);
+ metricManager.onRemove(job2);
+ assertEquals(0, listener.getGauge(metricId).get().getValue());
+ }
+
+ @Test
+ public void testMetricsMultiNamespace() {
+ var namespace1 = "ns1";
+ var namespace2 = "ns2";
+ var job1 = TestUtils.buildSessionJob("job", namespace1);
+ var job2 = TestUtils.buildSessionJob("job", namespace2);
+
+ var metricId1 = listener.getNamespaceMetricId(namespace1,
METRIC_GROUP_NAME, COUNTER_NAME);
+ var metricId2 = listener.getNamespaceMetricId(namespace2,
METRIC_GROUP_NAME, COUNTER_NAME);
+
+ assertTrue(listener.getGauge(metricId1).isEmpty());
+ assertTrue(listener.getGauge(metricId2).isEmpty());
+
+ metricManager.onUpdate(job1);
+ metricManager.onUpdate(job2);
+ assertEquals(1, listener.getGauge(metricId1).get().getValue());
+ assertEquals(1, listener.getGauge(metricId2).get().getValue());
+
+ metricManager.onUpdate(job1);
+ metricManager.onUpdate(job2);
+ assertEquals(1, listener.getGauge(metricId1).get().getValue());
+ assertEquals(1, listener.getGauge(metricId2).get().getValue());
+
+ metricManager.onRemove(job1);
+ metricManager.onRemove(job2);
+ assertEquals(0, listener.getGauge(metricId1).get().getValue());
+ assertEquals(0, listener.getGauge(metricId2).get().getValue());
+
+ metricManager.onRemove(job1);
+ metricManager.onRemove(job2);
+ assertEquals(0, listener.getGauge(metricId1).get().getValue());
+ assertEquals(0, listener.getGauge(metricId2).get().getValue());
+ }
+
@Test
- public void testMetrics() {
- MetricListener metricListener = new MetricListener();
- FlinkSessionJobMetrics metrics =
- new FlinkSessionJobMetrics(metricListener.getMetricGroup());
- assertTrue(metricListener.getGauge(METRIC_GROUP_NAME,
"Count").isPresent());
- assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME,
"Count").get().getValue());
- FlinkSessionJob flinkSessionJob = TestUtils.buildSessionJob();
- metrics.onUpdate(flinkSessionJob);
- assertEquals(1, metricListener.getGauge(METRIC_GROUP_NAME,
"Count").get().getValue());
- metrics.onRemove(flinkSessionJob);
- assertEquals(0, metricListener.getGauge(METRIC_GROUP_NAME,
"Count").get().getValue());
+ public void testMetricsDisabled() {
+ var flinkSessionJob = TestUtils.buildSessionJob();
+
+ var conf = new Configuration();
+ conf.set(OPERATOR_RESOURCE_METRICS_ENABLED, false);
+ var listener = new TestingMetricListener(conf);
+ var metricManager =
+ MetricManager.createFlinkSessionJobMetricManager(
+ new FlinkConfigManager(conf),
listener.getMetricGroup());
+
+ var metricId =
+ listener.getNamespaceMetricId(
+ flinkSessionJob.getMetadata().getNamespace(),
+ METRIC_GROUP_NAME,
+ COUNTER_NAME);
+
+ assertTrue(listener.getGauge(metricId).isEmpty());
+
+ metricManager.onUpdate(flinkSessionJob);
+ assertTrue(listener.getGauge(metricId).isEmpty());
+
+ metricManager.onRemove(flinkSessionJob);
+ assertTrue(listener.getGauge(metricId).isEmpty());
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java
new file mode 100644
index 00000000..294822d2
--- /dev/null
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Utility class for metrics testing. */
+public class TestingMetricListener {
+
+ public static final String DELIMITER = ".";
+ private final KubernetesOperatorMetricGroup metricGroup;
+ private final Map<String, Metric> metrics = new HashMap();
+ private static final String NAMESPACE = "test-op-ns";
+ private static final String NAME = "test-op-name";
+ private static final String HOST = "test-op-host";
+ private Configuration configuration;
+
+ public TestingMetricListener(Configuration configuration) {
+ TestingMetricRegistry registry =
+ TestingMetricRegistry.builder()
+ .setDelimiter(DELIMITER.charAt(0))
+ .setRegisterConsumer(
+ (metric, name, group) -> {
+
this.metrics.put(group.getMetricIdentifier(name), metric);
+ })
+ .build();
+ this.metricGroup =
+ KubernetesOperatorMetricGroup.create(
+ registry, configuration, NAMESPACE, NAME, HOST);
+ this.configuration = configuration;
+ }
+
+ public KubernetesOperatorMetricGroup getMetricGroup() {
+ return this.metricGroup;
+ }
+
+ public Optional<Counter> getCounter(String identifier) {
+ return this.getMetric(identifier);
+ }
+
+ public Optional<Histogram> getHistogram(String identifier) {
+ return this.getMetric(identifier);
+ }
+
+ public Optional<Meter> getMeter(String identifier) {
+ return this.getMetric(identifier);
+ }
+
+ public <T> Optional<Gauge<T>> getGauge(String identifier) {
+ return Optional.ofNullable((Gauge<T>) this.metrics.get(identifier));
+ }
+
+ private <T extends Metric> Optional<T> getMetric(String identifier) {
+ return Optional.ofNullable((T) this.metrics.get(identifier));
+ }
+
+ public String getMetricId(String... identifiers) {
+ return metricGroup.getMetricIdentifier(String.join(DELIMITER,
identifiers));
+ }
+
+ public String getNamespaceMetricId(String resourceNs, String...
identifiers) {
+ return metricGroup
+ .createResourceNamespaceGroup(configuration, resourceNs)
+ .getMetricIdentifier(String.join(DELIMITER, identifiers));
+ }
+}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
index 2e76430e..3240dc6c 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
@@ -21,10 +21,14 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
import
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.metrics.Histogram;
@@ -151,8 +155,10 @@ public class ResourceLifecycleMetricsTest {
dep3.getMetadata().setName("n3");
var conf = new Configuration();
- var metricManager = TestUtils.createTestMetricManager(conf);
- var lifeCycleMetrics = metricManager.getLifeCycleMetrics();
+ var metricManager =
+ MetricManager.createFlinkDeploymentMetricManager(
+ new FlinkConfigManager(conf),
TestUtils.createTestMetricGroup(conf));
+ var lifeCycleMetrics = getLifeCycleMetrics(metricManager);
metricManager.onUpdate(dep1);
metricManager.onUpdate(dep2);
@@ -184,8 +190,10 @@ public class ResourceLifecycleMetricsTest {
conf.set(
KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_NAMESPACE_HISTOGRAMS_ENABLED,
false);
- metricManager = TestUtils.createTestMetricManager(conf);
- lifeCycleMetrics = metricManager.getLifeCycleMetrics();
+ metricManager =
+ MetricManager.createFlinkDeploymentMetricManager(
+ new FlinkConfigManager(conf),
TestUtils.createTestMetricGroup(conf));
+ lifeCycleMetrics = getLifeCycleMetrics(metricManager);
metricManager.onUpdate(dep1);
metricManager.onUpdate(dep2);
@@ -200,27 +208,34 @@ public class ResourceLifecycleMetricsTest {
trackers.get(Tuple2.of("ns2", "n3")).getTransitionHistos());
trackers.get(Tuple2.of("ns1", "n1"))
.getStateTimeHistos()
- .forEach(
- (k, l) -> {
- assertEquals(1, l.size());
- });
+ .forEach((k, l) -> assertEquals(1, l.size()));
trackers.get(Tuple2.of("ns1", "n1"))
.getTransitionHistos()
- .forEach(
- (k, l) -> {
- assertEquals(1, l.size());
- });
+ .forEach((k, l) -> assertEquals(1, l.size()));
conf.set(KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED,
false);
- metricManager = TestUtils.createTestMetricManager(conf);
- assertNull(metricManager.getLifeCycleMetrics());
+ metricManager =
+ MetricManager.createFlinkDeploymentMetricManager(
+ new FlinkConfigManager(conf),
TestUtils.createTestMetricGroup(conf));
+ assertNull(getLifeCycleMetrics(metricManager));
metricManager.onUpdate(dep1);
metricManager.onUpdate(dep2);
metricManager.onUpdate(dep3);
}
+ public static LifecycleMetrics<FlinkDeployment> getLifeCycleMetrics(
+ MetricManager<FlinkDeployment> metricManager) {
+ for (CustomResourceMetrics<FlinkDeployment> metrics :
+ metricManager.getRegisteredMetrics()) {
+ if (metrics instanceof LifecycleMetrics) {
+ return (LifecycleMetrics<FlinkDeployment>) metrics;
+ }
+ }
+ return null;
+ }
+
private void validateTransition(
Map<String, List<Histogram>> histos, String name, int size, long
mean) {
histos.get(name)
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index 5dddb7b9..4b00ed4f 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -27,6 +27,7 @@ import
org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
@@ -59,7 +60,7 @@ public class SessionJobObserverTest {
public void before() {
kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
- var statusRecorder = new
TestingStatusRecorder<FlinkSessionJobStatus>();
+ var statusRecorder = new TestingStatusRecorder<FlinkSessionJob,
FlinkSessionJobStatus>();
flinkService = new TestingFlinkService();
FlinkServiceFactory flinkServiceFactory = new
TestingFlinkServiceFactory(flinkService);
observer =
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 2bb1f089..89f794e6 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -77,7 +77,7 @@ public class ApplicationReconcilerTest {
public void before() {
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
- var statusRecoder = new TestingStatusRecorder<FlinkDeploymentStatus>();
+ var statusRecoder = new TestingStatusRecorder<FlinkDeployment,
FlinkDeploymentStatus>();
reconciler =
new ApplicationReconciler(
kubernetesClient,
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index d6f342c9..6fc4764e 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -67,7 +67,7 @@ public class SessionJobReconcilerTest {
private TestingFlinkService flinkService = new TestingFlinkService();
private EventRecorder eventRecorder;
private SessionJobReconciler reconciler;
- private StatusRecorder<FlinkSessionJobStatus> statusRecoder;
+ private StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus>
statusRecoder;
@BeforeEach
public void before() {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
index e550f201..4fd4e4ac 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
@@ -17,10 +17,11 @@
package org.apache.flink.kubernetes.operator.utils;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -39,10 +40,8 @@ public class StatusRecorderTest {
@Test
public void testPatchOnlyWhenChanged() throws InterruptedException {
var helper =
- new StatusRecorder<FlinkDeploymentStatus>(
- kubernetesClient,
- TestUtils.createTestMetricManager(new Configuration()),
- (e, s) -> {});
+ new StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>(
+ kubernetesClient, new MetricManager<>(), (e, s) -> {});
var deployment = TestUtils.buildApplicationCluster();
kubernetesClient.resource(deployment).createOrReplace();
var lastRequest = mockServer.getLastRequest();