This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 6b895e75 [FLINK-38787] Introduce FlinkBlueGreenDeployment Metrics
6b895e75 is described below
commit 6b895e754a872b86c3298fc6b61c88f996216b17
Author: James Kan <[email protected]>
AuthorDate: Sun Jan 18 11:22:51 2026 -0800
[FLINK-38787] Introduce FlinkBlueGreenDeployment Metrics
---
docs/content/docs/operations/metrics-logging.md | 45 +-
.../flink/kubernetes/operator/FlinkOperator.java | 7 +-
.../FlinkBlueGreenDeploymentController.java | 22 +-
.../metrics/FlinkBlueGreenDeploymentMetrics.java | 188 ++++++++
.../kubernetes/operator/metrics/MetricManager.java | 30 ++
.../lifecycle/BlueGreenLifecycleMetrics.java | 197 ++++++++
.../BlueGreenResourceLifecycleMetricTracker.java | 163 +++++++
.../kubernetes/operator/utils/StatusRecorder.java | 23 +
.../FlinkBlueGreenDeploymentControllerTest.java | 3 +
.../TestingFlinkBlueGreenDeploymentController.java | 8 +-
.../FlinkBlueGreenDeploymentMetricsTest.java | 509 +++++++++++++++++++++
.../lifecycle/BlueGreenLifecycleMetricsTest.java | 224 +++++++++
...lueGreenResourceLifecycleMetricTrackerTest.java | 251 ++++++++++
13 files changed, 1656 insertions(+), 14 deletions(-)
diff --git a/docs/content/docs/operations/metrics-logging.md
b/docs/content/docs/operations/metrics-logging.md
index d1e194ef..ac21f564 100644
--- a/docs/content/docs/operations/metrics-logging.md
+++ b/docs/content/docs/operations/metrics-logging.md
@@ -35,15 +35,20 @@ Different operator metrics can be turned on/off
individually using the configura
### Flink Resource Metrics
The Operator gathers aggregates metrics about managed resources.
-| Scope | Metrics
| Description
| Type
|
-|--------------------|-------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|
+| Scope | Metrics
| Description
| Type
|
+|--------------------|-------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|
| Namespace | FlinkDeployment/FlinkSessionJob.Count
| Number of managed resources per namespace
| Gauge |
-| Namespace | FlinkDeployment.ResourceUsage.Cpu/Memory
| Total resources used per namespace
| Gauge |
-| Namespace | FlinkDeployment.JmDeploymentStatus.<Status>.Count
| Number of managed FlinkDeployment resources
per <Status> per namespace. <Status> can take values from: READY,
DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR |
Gauge |
-| Namespace | FlinkDeployment.FlinkVersion.<FlinkVersion>.Count
| Number of managed FlinkDeployment resources per
<FlinkVersion> per namespace. <FlinkVersion> is retrieved via REST
API from Flink JM. | Gauge
|
+| Namespace | FlinkDeployment.ResourceUsage.Cpu/Memory
| Total resources used per namespace
| Gauge |
+| Namespace | FlinkDeployment.JmDeploymentStatus.<Status>.Count
| Number of managed FlinkDeployment resources
per <Status> per namespace. <Status> can take values from: READY,
DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR | Gauge
|
+| Namespace | FlinkDeployment.FlinkVersion.<FlinkVersion>.Count
| Number of managed FlinkDeployment resources per
<FlinkVersion> per namespace. <FlinkVersion> is retrieved via REST
API from Flink JM. | Gauge |
| Namespace |
FlinkDeployment/FlinkSessionJob.Lifecycle.State.<State>.Count
| Number of managed resources currently in state <State> per
namespace. <State> can take values from: CREATED, SUSPENDED, UPGRADING,
DEPLOYED, STABLE, ROLLING_BACK, ROLLED_BACK, FAILED | Gauge |
-| System/Namespace |
FlinkDeployment/FlinkSessionJob.Lifecycle.State.<State>.TimeSeconds
| Time spent in state <State> for a given resource.
<State> can take values from: CREATED, SUSPENDED, UPGRADING, DEPLOYED,
STABLE, ROLLING_BACK, ROLLED_BACK, FAILED | Histogram |
-| System/Namespace |
FlinkDeployment/FlinkSessionJob.Lifecycle.Transition.<Transition>.TimeSeconds
| Time statistics for selected lifecycle state transitions.
<Transition> can take values from: Resume, Upgrade, Suspend,
Stabilization, Rollback, Submission |
Histogram |
+| System/Namespace |
FlinkDeployment/FlinkSessionJob.Lifecycle.State.<State>.TimeSeconds
| Time spent in state <State> for a given resource.
<State> can take values from: CREATED, SUSPENDED, UPGRADING, DEPLOYED,
STABLE, ROLLING_BACK, ROLLED_BACK, FAILED | Histogram |
+| System/Namespace |
FlinkDeployment/FlinkSessionJob.Lifecycle.Transition.<Transition>.TimeSeconds
| Time statistics for selected lifecycle state transitions.
<Transition> can take values from: Resume, Upgrade, Suspend,
Stabilization, Rollback, Submission |
Histogram |
+| Namespace |
FlinkBlueGreenDeployment.BlueGreenState.<State>.Count
| Number of managed FlinkBlueGreenDeployment resources currently in
state <State> per namespace. <State> can take values from:
INITIALIZING_BLUE, ACTIVE_BLUE, SAVEPOINTING_BLUE, TRANSITIONING_TO_GREEN,
ACTIVE_GREEN, SAVEPOINTING_GREEN, TRANSITIONING_TO_BLUE | Gauge |
+| Namespace | FlinkBlueGreenDeployment.JobStatus.<Status>.Count
| Number of managed FlinkBlueGreenDeployment
resources currently in JobStatus <Status> per namespace. <Status>
can take values from: RUNNING, FAILING, SUSPENDED, FAILED, RECONCILING | Gauge
|
+| Namespace | FlinkBlueGreenDeployment.Failures
| Historical count of failure events
(transitions to FAILING state) for all FlinkBlueGreenDeployment resources in
the namespace. Counter increments on each transition to FAILING and never
decrements. | Counter |
+| System/Namespace |
FlinkBlueGreenDeployment.Lifecycle.State.<State>.TimeSeconds
| Time spent in state <State> for a given
FlinkBlueGreenDeployment resource. <State> values same as above.
|
Histogram |
+| System/Namespace |
FlinkBlueGreenDeployment.Lifecycle.Transition.<Transition>.TimeSeconds
| Time statistics for blue-green lifecycle state transitions.
<Transition> can take values from: InitialDeployment, BlueToGreen,
GreenToBlue | Histogram |
#### Lifecycle metrics
@@ -60,6 +65,32 @@ In addition to the simple counts we further track a few
selected state transitio
- Rollback : Time from deployed to rolled_back state if the resource was
rolled back
- Submission: Flink resource submission time
+#### FlinkBlueGreenDeployment Lifecycle metrics
+
+FlinkBlueGreenDeployment resources have their own lifecycle states that track
the blue-green deployment process. The operator monitors the following
transitions:
+
+ - InitialDeployment : Time from leaving INITIALIZING_BLUE to reaching
ACTIVE_BLUE (first deployment)
+ - BlueToGreen : Time from leaving ACTIVE_BLUE to reaching ACTIVE_GREEN
(actual transition duration)
+ - GreenToBlue : Time from leaving ACTIVE_GREEN to reaching ACTIVE_BLUE
(actual transition duration)
+
+Transition metrics measure the actual transition time - from when the
deployment leaves the source stable state until it reaches the target stable
state. This excludes time spent running stably before the transition was
initiated.
+
+State time metrics track how long a resource spends in each state
(ACTIVE_BLUE, SAVEPOINTING_BLUE, TRANSITIONING_TO_GREEN, etc.), which helps
identify bottlenecks in the deployment pipeline.
+
+#### FlinkBlueGreenDeployment JobStatus Tracking
+
+In addition to BlueGreenState tracking, FlinkBlueGreenDeployment resources
also expose JobStatus metrics that track the Flink job state:
+
+**JobStatus Gauges**: Current count of deployments per JobStatus (RUNNING,
FAILING, SUSPENDED, etc.) - these gauges go up and down as deployments
transition between states.
+
+**Failures Counter**: Historical count that increments each time a deployment
transitions TO the FAILING state. This counter:
+ - Never decrements (accumulates total failures since operator start)
+ - Increments on each new transition to FAILING (even if the same deployment
fails multiple times)
+ - Persists across deployment recoveries (provides historical failure tracking)
+ - Useful for calculating failure rates and setting up alerts
+
+Example: A deployment goes RUNNING → FAILING → RUNNING → FAILING. The FAILING
gauge shows 0 or 1 (current state), while the Failures counter shows 2
(historical events).
+
### Kubernetes Client Metrics
The Operator gathers various metrics related to Kubernetes API server access.
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 96473ddb..fdc856f2 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -257,7 +257,12 @@ public class FlinkOperator {
@VisibleForTesting
void registerBlueGreenController() {
- var controller = new FlinkBlueGreenDeploymentController(ctxFactory);
+ var metricManager =
+
MetricManager.createFlinkBlueGreenDeploymentMetricManager(baseConfig,
metricGroup);
+ var statusRecorder =
+ StatusRecorder.createForFlinkBlueGreenDeployment(client,
metricManager, listeners);
+ var controller = new FlinkBlueGreenDeploymentController(ctxFactory,
statusRecorder);
+
registeredControllers.add(operator.register(controller,
this::overrideControllerConfigs));
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
index a35ccb2b..bac6f131 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
@@ -26,6 +26,7 @@ import
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploy
import
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenStateHandlerRegistry;
import
org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.BlueGreenStateHandler;
import
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import
io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -70,10 +71,16 @@ public class FlinkBlueGreenDeploymentController implements
Reconciler<FlinkBlueG
private final FlinkResourceContextFactory ctxFactory;
private final BlueGreenStateHandlerRegistry handlerRegistry;
+ private final StatusRecorder<FlinkBlueGreenDeployment,
FlinkBlueGreenDeploymentStatus>
+ statusRecorder;
- public FlinkBlueGreenDeploymentController(FlinkResourceContextFactory
ctxFactory) {
+ public FlinkBlueGreenDeploymentController(
+ FlinkResourceContextFactory ctxFactory,
+ StatusRecorder<FlinkBlueGreenDeployment,
FlinkBlueGreenDeploymentStatus>
+ statusRecorder) {
this.ctxFactory = ctxFactory;
this.handlerRegistry = new BlueGreenStateHandlerRegistry();
+ this.statusRecorder = statusRecorder;
}
@Override
@@ -110,9 +117,12 @@ public class FlinkBlueGreenDeploymentController implements
Reconciler<FlinkBlueG
josdkContext,
null,
ctxFactory);
- return BlueGreenDeploymentService.patchStatusUpdateControl(
- context, INITIALIZING_BLUE, null, null)
- .rescheduleAfter(0);
+ UpdateControl<FlinkBlueGreenDeployment> updateControl =
+ BlueGreenDeploymentService.patchStatusUpdateControl(
+ context, INITIALIZING_BLUE, null, null)
+ .rescheduleAfter(0);
+ statusRecorder.patchAndCacheStatus(bgDeployment,
josdkContext.getClient());
+ return updateControl;
} else {
FlinkBlueGreenDeploymentState currentState =
deploymentStatus.getBlueGreenState();
var context =
@@ -132,7 +142,9 @@ public class FlinkBlueGreenDeploymentController implements
Reconciler<FlinkBlueG
context.getDeploymentName());
BlueGreenStateHandler handler =
handlerRegistry.getHandler(currentState);
- return handler.handle(context);
+ UpdateControl<FlinkBlueGreenDeployment> updateControl =
handler.handle(context);
+ statusRecorder.patchAndCacheStatus(bgDeployment,
josdkContext.getClient());
+ return updateControl;
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetrics.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetrics.java
new file mode 100644
index 00000000..4862cb08
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetrics.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Metrics for FlinkBlueGreenDeployment resources. */
+public class FlinkBlueGreenDeploymentMetrics
+ implements CustomResourceMetrics<FlinkBlueGreenDeployment> {
+
+ public static final String BG_STATE_GROUP_NAME = "BlueGreenState";
+ public static final String JOB_STATUS_GROUP_NAME = "JobStatus";
+ public static final String COUNTER_NAME = "Count";
+ public static final String FAILURES_COUNTER_NAME = "Failures";
+
+ private final KubernetesOperatorMetricGroup parentMetricGroup;
+ private final Configuration configuration;
+
+ // Tracks which deployments are in which state per namespace (for gauge
metrics)
+ // Map: namespace -> state -> set of deployment names
+ private final Map<String, Map<FlinkBlueGreenDeploymentState, Set<String>>>
deploymentStatuses =
+ new ConcurrentHashMap<>();
+
+ // Tracks which deployments are in which JobStatus per namespace (for
gauge metrics)
+ // Map: namespace -> JobStatus -> set of deployment names
+ private final Map<String, Map<JobStatus, Set<String>>> jobStatuses = new
ConcurrentHashMap<>();
+
+ // Failure counters per namespace (historical count, never decrements)
+ // Map: namespace -> Counter
+ private final Map<String, Counter> failureCounters = new
ConcurrentHashMap<>();
+
+ public FlinkBlueGreenDeploymentMetrics(
+ KubernetesOperatorMetricGroup parentMetricGroup, Configuration
configuration) {
+ this.parentMetricGroup = parentMetricGroup;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void onUpdate(FlinkBlueGreenDeployment flinkBgDep) {
+ var namespace = flinkBgDep.getMetadata().getNamespace();
+ var deploymentName = flinkBgDep.getMetadata().getName();
+ var state = flinkBgDep.getStatus().getBlueGreenState();
+
+ // Get current JobStatus
+ var jobStatusObj = flinkBgDep.getStatus().getJobStatus();
+ var currentJobStatus = jobStatusObj != null ? jobStatusObj.getState()
: null;
+
+ // Check if was in FAILING state BEFORE
+ boolean wasInFailing = false;
+ if (currentJobStatus != null) {
+ var namespaceJobStatuses = jobStatuses.get(namespace);
+ if (namespaceJobStatuses != null) {
+ var failingSet = namespaceJobStatuses.get(JobStatus.FAILING);
+ wasInFailing = failingSet != null &&
failingSet.contains(deploymentName);
+ }
+ }
+
+ // Clear from all tracking (BlueGreenState and JobStatus)
+ clearStateCount(flinkBgDep);
+
+ deploymentStatuses
+ .computeIfAbsent(namespace, this::initNamespaceMetrics)
+ .get(state)
+ .add(deploymentName);
+
+ // Track JobStatus in gauge
+ if (currentJobStatus != null) {
+ jobStatuses
+ .computeIfAbsent(namespace, ns -> createJobStatusMap())
+ .get(currentJobStatus)
+ .add(deploymentName);
+
+ // Detect transition TO FAILING for counter
+ if (currentJobStatus == JobStatus.FAILING && !wasInFailing) {
+ var counter = failureCounters.get(namespace);
+ if (counter != null) {
+ counter.inc();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onRemove(FlinkBlueGreenDeployment flinkBgDep) {
+ clearStateCount(flinkBgDep);
+ }
+
+ /** Clears the deployment from all state count sets (used before updating
to new state). */
+ private void clearStateCount(FlinkBlueGreenDeployment flinkBgDep) {
+ var namespace = flinkBgDep.getMetadata().getNamespace();
+ var deploymentName = flinkBgDep.getMetadata().getName();
+
+ var namespaceStatuses = deploymentStatuses.get(namespace);
+ if (namespaceStatuses != null) {
+ namespaceStatuses
+ .values()
+ .forEach(deploymentNames ->
deploymentNames.remove(deploymentName));
+ }
+
+ // Clear from JobStatus tracking
+ var namespaceJobStatuses = jobStatuses.get(namespace);
+ if (namespaceJobStatuses != null) {
+ namespaceJobStatuses
+ .values()
+ .forEach(deploymentNames ->
deploymentNames.remove(deploymentName));
+ }
+ }
+
+ private Map<FlinkBlueGreenDeploymentState, Set<String>>
initNamespaceMetrics(String namespace) {
+ MetricGroup nsGroup =
+ parentMetricGroup.createResourceNamespaceGroup(
+ configuration, FlinkBlueGreenDeployment.class,
namespace);
+
+ // Total deployment count
+ nsGroup.gauge(
+ COUNTER_NAME,
+ () ->
+ deploymentStatuses.get(namespace).values().stream()
+ .mapToInt(Set::size)
+ .sum());
+
+ // Historical failure counter (increments on each transition TO
FAILING)
+ failureCounters.put(namespace, nsGroup.counter(FAILURES_COUNTER_NAME));
+
+ // Per-BlueGreenState counts
+ Map<FlinkBlueGreenDeploymentState, Set<String>> statuses = new
ConcurrentHashMap<>();
+ for (FlinkBlueGreenDeploymentState state :
FlinkBlueGreenDeploymentState.values()) {
+ statuses.put(state, ConcurrentHashMap.newKeySet());
+ nsGroup.addGroup(BG_STATE_GROUP_NAME)
+ .addGroup(state.toString())
+ .gauge(COUNTER_NAME, () ->
deploymentStatuses.get(namespace).get(state).size());
+ }
+
+ // Per-JobStatus counts (gauges for current state)
+ initJobStatusMetrics(namespace, nsGroup);
+
+ return statuses;
+ }
+
+ private void initJobStatusMetrics(String namespace, MetricGroup nsGroup) {
+ for (JobStatus status : JobStatus.values()) {
+ nsGroup.addGroup(JOB_STATUS_GROUP_NAME)
+ .addGroup(status.toString())
+ .gauge(
+ COUNTER_NAME,
+ () -> {
+ var nsJobStatuses = jobStatuses.get(namespace);
+ if (nsJobStatuses == null) {
+ return 0;
+ }
+ var statusSet = nsJobStatuses.get(status);
+ return statusSet != null ? statusSet.size() :
0;
+ });
+ }
+ }
+
+ private Map<JobStatus, Set<String>> createJobStatusMap() {
+ Map<JobStatus, Set<String>> statuses = new ConcurrentHashMap<>();
+ for (JobStatus status : JobStatus.values()) {
+ statuses.put(status, ConcurrentHashMap.newKeySet());
+ }
+ return statuses;
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
index 02ee9a86..ca571188 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
@@ -20,9 +20,11 @@ package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics;
import org.apache.flink.kubernetes.operator.metrics.lifecycle.LifecycleMetrics;
import io.fabric8.kubernetes.client.CustomResource;
@@ -69,6 +71,15 @@ public class MetricManager<CR extends CustomResource<?, ?>> {
return metricManager;
}
+ public static MetricManager<FlinkBlueGreenDeployment>
+ createFlinkBlueGreenDeploymentMetricManager(
+ Configuration conf, KubernetesOperatorMetricGroup
metricGroup) {
+ MetricManager<FlinkBlueGreenDeployment> metricManager = new
MetricManager<>();
+ registerFlinkBlueGreenDeploymentMetrics(conf, metricGroup,
metricManager);
+ registerBlueGreenLifecycleMetrics(conf, metricGroup, metricManager);
+ return metricManager;
+ }
+
private static void registerFlinkDeploymentMetrics(
Configuration conf,
KubernetesOperatorMetricGroup metricGroup,
@@ -96,6 +107,15 @@ public class MetricManager<CR extends CustomResource<?, ?>>
{
}
}
+ private static void registerFlinkBlueGreenDeploymentMetrics(
+ Configuration conf,
+ KubernetesOperatorMetricGroup metricGroup,
+ MetricManager<FlinkBlueGreenDeployment> metricManager) {
+ if
(conf.get(KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED)) {
+ metricManager.register(new
FlinkBlueGreenDeploymentMetrics(metricGroup, conf));
+ }
+ }
+
private static <CR extends AbstractFlinkResource<?, ?>> void
registerLifecycleMetrics(
Configuration conf,
KubernetesOperatorMetricGroup metricGroup,
@@ -106,6 +126,16 @@ public class MetricManager<CR extends CustomResource<?,
?>> {
}
}
+ private static void registerBlueGreenLifecycleMetrics(
+ Configuration conf,
+ KubernetesOperatorMetricGroup metricGroup,
+ MetricManager<FlinkBlueGreenDeployment> metricManager) {
+ if
(conf.get(KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED)
+ &&
conf.get(KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED)) {
+ metricManager.register(new BlueGreenLifecycleMetrics(conf,
metricGroup));
+ }
+ }
+
@VisibleForTesting
public List<CustomResourceMetrics<CR>> getRegisteredMetrics() {
return registeredMetrics;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetrics.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetrics.java
new file mode 100644
index 00000000..e8743528
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetrics.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
+import
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
+import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+import org.apache.flink.metrics.Histogram;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
+
+/** Manages lifecycle metrics for FlinkBlueGreenDeployment resources. */
+public class BlueGreenLifecycleMetrics implements
CustomResourceMetrics<FlinkBlueGreenDeployment> {
+
+ public static final String LIFECYCLE_GROUP_NAME = "Lifecycle";
+ public static final String TRANSITION_GROUP_NAME = "Transition";
+ public static final String STATE_GROUP_NAME = "State";
+ public static final String TIME_SECONDS_NAME = "TimeSeconds";
+
+ public static final String TRANSITION_INITIAL_DEPLOYMENT =
"InitialDeployment";
+ public static final String TRANSITION_BLUE_TO_GREEN = "BlueToGreen";
+ public static final String TRANSITION_GREEN_TO_BLUE = "GreenToBlue";
+
+ private static final List<String> TRANSITIONS =
+ List.of(
+ TRANSITION_INITIAL_DEPLOYMENT,
+ TRANSITION_BLUE_TO_GREEN,
+ TRANSITION_GREEN_TO_BLUE);
+
+ private final KubernetesOperatorMetricGroup parentMetricGroup;
+ private final Configuration configuration;
+ private final FlinkOperatorConfiguration operatorConfig;
+ private final Clock clock;
+ private final boolean lifecycleMetricsEnabled;
+
+ private final Map<String, Map<String,
BlueGreenResourceLifecycleMetricTracker>>
+ lifecycleTrackers = new ConcurrentHashMap<>();
+ private final Map<String, Map<String, Histogram>>
namespaceTransitionHistograms =
+ new ConcurrentHashMap<>();
+ private final Map<FlinkBlueGreenDeploymentState, Map<String, Histogram>>
+ namespaceStateTimeHistograms = new ConcurrentHashMap<>();
+
+ private final Map<String, Histogram> systemTransitionHistograms = new
ConcurrentHashMap<>();
+ private final Map<FlinkBlueGreenDeploymentState, Histogram>
systemStateTimeHistograms =
+ new ConcurrentHashMap<>();
+
+ public BlueGreenLifecycleMetrics(
+ Configuration configuration, KubernetesOperatorMetricGroup
parentMetricGroup) {
+ this.parentMetricGroup = parentMetricGroup;
+ this.configuration = configuration;
+ this.operatorConfig =
FlinkOperatorConfiguration.fromConfiguration(configuration);
+ this.clock = Clock.systemDefaultZone();
+ this.lifecycleMetricsEnabled =
+ configuration.get(
+
KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED);
+
+ TRANSITIONS.forEach(t -> namespaceTransitionHistograms.put(t, new
ConcurrentHashMap<>()));
+ for (FlinkBlueGreenDeploymentState state :
FlinkBlueGreenDeploymentState.values()) {
+ namespaceStateTimeHistograms.put(state, new ConcurrentHashMap<>());
+ }
+ }
+
+ @Override
+ public void onUpdate(FlinkBlueGreenDeployment flinkBgDep) {
+ if (!lifecycleMetricsEnabled) {
+ return;
+ }
+
+ var namespace = flinkBgDep.getMetadata().getNamespace();
+ var deploymentName = flinkBgDep.getMetadata().getName();
+ var state = flinkBgDep.getStatus().getBlueGreenState();
+
+ getOrCreateTracker(namespace, deploymentName,
flinkBgDep).onUpdate(state, clock.instant());
+ }
+
+ @Override
+ public void onRemove(FlinkBlueGreenDeployment flinkBgDep) {
+ var namespace = flinkBgDep.getMetadata().getNamespace();
+ var deploymentName = flinkBgDep.getMetadata().getName();
+
+ var namespaceTrackers = lifecycleTrackers.get(namespace);
+ if (namespaceTrackers != null) {
+ namespaceTrackers.remove(deploymentName);
+ }
+ }
+
+ private BlueGreenResourceLifecycleMetricTracker getOrCreateTracker(
+ String namespace, String deploymentName, FlinkBlueGreenDeployment
flinkBgDep) {
+ return lifecycleTrackers
+ .computeIfAbsent(namespace, ns -> new ConcurrentHashMap<>())
+ .computeIfAbsent(deploymentName, dn ->
createTracker(namespace, flinkBgDep));
+ }
+
+ private BlueGreenResourceLifecycleMetricTracker createTracker(
+ String namespace, FlinkBlueGreenDeployment flinkBgDep) {
+ var initialState = flinkBgDep.getStatus().getBlueGreenState();
+ var time =
+ initialState == INITIALIZING_BLUE
+ ?
Instant.parse(flinkBgDep.getMetadata().getCreationTimestamp())
+ : clock.instant();
+
+ return new BlueGreenResourceLifecycleMetricTracker(
+ initialState,
+ time,
+ buildTransitionHistograms(namespace),
+ buildStateTimeHistograms(namespace));
+ }
+
+ private Map<String, List<Histogram>> buildTransitionHistograms(String
namespace) {
+ var histos = new HashMap<String, List<Histogram>>();
+ for (String transition : TRANSITIONS) {
+ histos.put(
+ transition,
+ List.of(
+ systemTransitionHistograms.computeIfAbsent(
+ transition,
+ t ->
createSystemHistogram(TRANSITION_GROUP_NAME, t)),
+ namespaceTransitionHistograms
+ .get(transition)
+ .computeIfAbsent(
+ namespace,
+ ns ->
+ createNamespaceHistogram(
+ ns,
+
TRANSITION_GROUP_NAME,
+ transition))));
+ }
+ return histos;
+ }
+
+ private Map<FlinkBlueGreenDeploymentState, List<Histogram>>
buildStateTimeHistograms(
+ String namespace) {
+ var histos = new HashMap<FlinkBlueGreenDeploymentState,
List<Histogram>>();
+ for (FlinkBlueGreenDeploymentState state :
FlinkBlueGreenDeploymentState.values()) {
+ histos.put(
+ state,
+ List.of(
+ systemStateTimeHistograms.computeIfAbsent(
+ state, s ->
createSystemHistogram(STATE_GROUP_NAME, s.name())),
+ namespaceStateTimeHistograms
+ .get(state)
+ .computeIfAbsent(
+ namespace,
+ ns ->
+ createNamespaceHistogram(
+ ns,
STATE_GROUP_NAME, state.name()))));
+ }
+ return histos;
+ }
+
+ private Histogram createSystemHistogram(String groupName, String
metricName) {
+ return parentMetricGroup
+ .addGroup(FlinkBlueGreenDeployment.class.getSimpleName())
+ .addGroup(LIFECYCLE_GROUP_NAME)
+ .addGroup(groupName)
+ .addGroup(metricName)
+ .histogram(TIME_SECONDS_NAME,
OperatorMetricUtils.createHistogram(operatorConfig));
+ }
+
+ private Histogram createNamespaceHistogram(
+ String namespace, String groupName, String metricName) {
+ return parentMetricGroup
+ .createResourceNamespaceGroup(
+ configuration, FlinkBlueGreenDeployment.class,
namespace)
+ .addGroup(LIFECYCLE_GROUP_NAME)
+ .addGroup(groupName)
+ .addGroup(metricName)
+ .histogram(TIME_SECONDS_NAME,
OperatorMetricUtils.createHistogram(operatorConfig));
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenResourceLifecycleMetricTracker.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenResourceLifecycleMetricTracker.java
new file mode 100644
index 00000000..53631278
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenResourceLifecycleMetricTracker.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import org.apache.flink.metrics.Histogram;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_BLUE;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_GREEN;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
+import static
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_BLUE_TO_GREEN;
+import static
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_GREEN_TO_BLUE;
+import static
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_INITIAL_DEPLOYMENT;
+
+/**
+ * Tracks state transitions and timing for a single FlinkBlueGreenDeployment
resource. Records
+ * transition durations and time spent in each state.
+ */
+public class BlueGreenResourceLifecycleMetricTracker {
+
+ private static final Logger LOG =
+
LoggerFactory.getLogger(BlueGreenResourceLifecycleMetricTracker.class);
+
+ private FlinkBlueGreenDeploymentState currentState;
+
+ // map(state -> (firstEntryTime, lastUpdateTime))
+ // Tracks when we entered each state and when we last saw it
+ private final Map<FlinkBlueGreenDeploymentState, Tuple2<Instant, Instant>>
stateTimeMap =
+ new HashMap<>();
+
+ private final Map<String, List<Histogram>> transitionHistos;
+ private final Map<FlinkBlueGreenDeploymentState, List<Histogram>>
stateTimeHistos;
+
+ public BlueGreenResourceLifecycleMetricTracker(
+ FlinkBlueGreenDeploymentState initialState,
+ Instant time,
+ Map<String, List<Histogram>> transitionHistos,
+ Map<FlinkBlueGreenDeploymentState, List<Histogram>>
stateTimeHistos) {
+ this.currentState = initialState;
+ this.transitionHistos = transitionHistos;
+ this.stateTimeHistos = stateTimeHistos;
+ stateTimeMap.put(initialState, Tuple2.of(time, time));
+ }
+
+ /**
+ * Called on every reconciliation. Updates timestamps and records metrics
on state changes.
+ *
+ * @param newState the current state from the resource status
+ * @param time the current timestamp
+ */
+ public void onUpdate(FlinkBlueGreenDeploymentState newState, Instant time)
{
+ if (newState == currentState) {
+ updateLastUpdateTime(newState, time);
+ return;
+ }
+
+ // Record exit time for states that transition faster than the
heartbeat interval.
+ updateLastUpdateTime(currentState, time);
+ recordTransitionMetrics(currentState, newState, time);
+
+ if (newState == ACTIVE_BLUE || newState == ACTIVE_GREEN) {
+ LOG.debug(
+ "Transitioned from {} to {}, recording state times for {}
and clearing",
+ currentState,
+ newState,
+ stateTimeMap.keySet());
+
+ recordStateTimeMetrics();
+ clearTrackedStates();
+ }
+
+ stateTimeMap.put(newState, Tuple2.of(time, time));
+ currentState = newState;
+ }
+
+ private void updateLastUpdateTime(FlinkBlueGreenDeploymentState state,
Instant time) {
+ var times = stateTimeMap.get(state);
+ if (times != null) {
+ times.f1 = time;
+ }
+ }
+
+ private void recordTransitionMetrics(
+ FlinkBlueGreenDeploymentState fromState,
+ FlinkBlueGreenDeploymentState toState,
+ Instant time) {
+
+ if (toState == ACTIVE_BLUE &&
stateTimeMap.containsKey(INITIALIZING_BLUE)) {
+ recordTransition(TRANSITION_INITIAL_DEPLOYMENT, INITIALIZING_BLUE,
time);
+ }
+
+ if (toState == ACTIVE_GREEN && stateTimeMap.containsKey(ACTIVE_BLUE)) {
+ recordTransition(TRANSITION_BLUE_TO_GREEN, ACTIVE_BLUE, time);
+ }
+
+ if (toState == ACTIVE_BLUE
+ && fromState != INITIALIZING_BLUE
+ && stateTimeMap.containsKey(ACTIVE_GREEN)) {
+ recordTransition(TRANSITION_GREEN_TO_BLUE, ACTIVE_GREEN, time);
+ }
+ }
+
+ private void recordTransition(
+ String transitionName, FlinkBlueGreenDeploymentState fromState,
Instant time) {
+ var fromTimes = stateTimeMap.get(fromState);
+ if (fromTimes == null) {
+ return;
+ }
+
+ long durationSeconds = Duration.between(fromTimes.f1,
time).toSeconds();
+
+ LOG.debug(
+ "Recording transition time {}s for {} (from {})",
+ durationSeconds,
+ transitionName,
+ fromState);
+
+ var histograms = transitionHistos.get(transitionName);
+ if (histograms != null) {
+ histograms.forEach(h -> h.update(durationSeconds));
+ }
+ }
+
+ private void recordStateTimeMetrics() {
+ stateTimeMap.forEach(
+ (state, times) -> {
+ long durationSeconds = Duration.between(times.f0,
times.f1).toSeconds();
+ var histograms = stateTimeHistos.get(state);
+ if (histograms != null) {
+ histograms.forEach(h -> h.update(durationSeconds));
+ }
+ });
+ }
+
+ private void clearTrackedStates() {
+ stateTimeMap.clear();
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index e06aa111..c1328042 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -20,12 +20,14 @@ package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
@@ -131,6 +133,8 @@ public class StatusRecorder<CR extends CustomResource<?,
STATUS>, STATUS> {
statusClass = FlinkSessionJobStatus.class;
} else if (resource instanceof FlinkStateSnapshot) {
statusClass = FlinkStateSnapshotStatus.class;
+ } else if (resource instanceof FlinkBlueGreenDeployment) {
+ statusClass = FlinkBlueGreenDeploymentStatus.class;
} else {
throw new RuntimeException(
String.format("Resource is unknown class: %s",
resource.getClass()));
@@ -300,6 +304,25 @@ public class StatusRecorder<CR extends CustomResource<?,
STATUS>, STATUS> {
return new StatusRecorder<>(metricManager, consumer);
}
+ public static StatusRecorder<FlinkBlueGreenDeployment,
FlinkBlueGreenDeploymentStatus>
+ createForFlinkBlueGreenDeployment(
+ KubernetesClient kubernetesClient,
+ MetricManager<FlinkBlueGreenDeployment> metricManager,
+ Collection<FlinkResourceListener> listeners) {
+ BiConsumer<FlinkBlueGreenDeployment, FlinkBlueGreenDeploymentStatus>
consumer =
+ (resource, previousStatus) -> {
+ listeners.forEach(
+ listener -> {
+ // FlinkResourceListener doesn't have a
specific method for
+ // BlueGreen deployments yet, so we skip
listener notifications
+ // for now. Metrics will still be tracked via
MetricManager.
+ });
+ // No audit logging for BlueGreen deployments yet
+ };
+
+ return new StatusRecorder<>(metricManager, consumer);
+ }
+
public static StatusRecorder<FlinkStateSnapshot, FlinkStateSnapshotStatus>
createForFlinkStateSnapshot(
KubernetesClient kubernetesClient,
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
index 8240b8c1..11c1b931 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
@@ -1015,6 +1015,9 @@ public class FlinkBlueGreenDeploymentControllerTest {
throws Exception {
Long minReconciliationTs = System.currentTimeMillis() - 1;
+ // Create the resource in the mock server before reconciling
+ kubernetesClient.resource(blueGreenDeployment).createOrReplace();
+
// 1a. Initializing deploymentStatus with this call
var rs = reconcile(blueGreenDeployment);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
index 6e8e058c..5bddcf4d 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
@@ -24,7 +24,9 @@ import
org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
@@ -50,7 +52,11 @@ public class TestingFlinkBlueGreenDeploymentController
flinkService,
null);
- flinkBlueGreenDeploymentController = new
FlinkBlueGreenDeploymentController(contextFactory);
+ StatusRecorder<FlinkBlueGreenDeployment,
FlinkBlueGreenDeploymentStatus> statusRecorder =
+ new StatusRecorder<>(new MetricManager<>(), (resource, status)
-> {});
+
+ flinkBlueGreenDeploymentController =
+ new FlinkBlueGreenDeploymentController(contextFactory,
statusRecorder);
}
@Override
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
new file mode 100644
index 00000000..10469677
--- /dev/null
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode;
+import
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import
org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
+
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.UUID;
+
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_BLUE;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_GREEN;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN;
+import static
org.apache.flink.kubernetes.operator.metrics.FlinkBlueGreenDeploymentMetrics.BG_STATE_GROUP_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.FlinkBlueGreenDeploymentMetrics.COUNTER_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.FlinkBlueGreenDeploymentMetrics.FAILURES_COUNTER_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.FlinkBlueGreenDeploymentMetrics.JOB_STATUS_GROUP_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkBlueGreenDeploymentMetrics}. */
+public class FlinkBlueGreenDeploymentMetricsTest {
+
+ private static final String TEST_NAMESPACE = "test-namespace";
+
+ private final Configuration configuration = new Configuration();
+ private TestingMetricListener listener;
+ private MetricManager<FlinkBlueGreenDeployment> metricManager;
+
+ @BeforeEach
+ public void init() {
+ listener = new TestingMetricListener(configuration);
+ metricManager =
+ MetricManager.createFlinkBlueGreenDeploymentMetricManager(
+ configuration, listener.getMetricGroup());
+ }
+
+ @Test
+ public void testStateCountMetricsSameNamespace() {
+ var deployment1 = buildBlueGreenDeployment("deployment1",
TEST_NAMESPACE);
+ var deployment2 = buildBlueGreenDeployment("deployment2",
TEST_NAMESPACE);
+
+ var counterId =
+ listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class, TEST_NAMESPACE,
COUNTER_NAME);
+ assertTrue(listener.getGauge(counterId).isEmpty());
+
+ // Both deployments start in INITIALIZING_BLUE
+ metricManager.onUpdate(deployment1);
+ metricManager.onUpdate(deployment2);
+
+ assertEquals(2, listener.getGauge(counterId).get().getValue());
+ assertStateCount(TEST_NAMESPACE, INITIALIZING_BLUE, 2);
+
+ // Move deployment1 to ACTIVE_BLUE
+ deployment1.getStatus().setBlueGreenState(ACTIVE_BLUE);
+ metricManager.onUpdate(deployment1);
+
+ assertStateCount(TEST_NAMESPACE, INITIALIZING_BLUE, 1);
+ assertStateCount(TEST_NAMESPACE, ACTIVE_BLUE, 1);
+
+ // Move deployment2 to ACTIVE_BLUE as well
+ deployment2.getStatus().setBlueGreenState(ACTIVE_BLUE);
+ metricManager.onUpdate(deployment2);
+
+ assertStateCount(TEST_NAMESPACE, INITIALIZING_BLUE, 0);
+ assertStateCount(TEST_NAMESPACE, ACTIVE_BLUE, 2);
+
+ // Remove deployments
+ metricManager.onRemove(deployment1);
+ assertEquals(1, listener.getGauge(counterId).get().getValue());
+ assertStateCount(TEST_NAMESPACE, ACTIVE_BLUE, 1);
+
+ metricManager.onRemove(deployment2);
+ assertEquals(0, listener.getGauge(counterId).get().getValue());
+ assertStateCount(TEST_NAMESPACE, ACTIVE_BLUE, 0);
+ }
+
+ @Test
+ public void testStateCountMetricsMultiNamespace() {
+ var namespace1 = "ns1";
+ var namespace2 = "ns2";
+ var deployment1 = buildBlueGreenDeployment("deployment", namespace1);
+ var deployment2 = buildBlueGreenDeployment("deployment", namespace2);
+
+ var counterId1 =
+ listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class, namespace1,
COUNTER_NAME);
+ var counterId2 =
+ listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class, namespace2,
COUNTER_NAME);
+
+ assertTrue(listener.getGauge(counterId1).isEmpty());
+ assertTrue(listener.getGauge(counterId2).isEmpty());
+
+ metricManager.onUpdate(deployment1);
+ metricManager.onUpdate(deployment2);
+
+ assertEquals(1, listener.getGauge(counterId1).get().getValue());
+ assertEquals(1, listener.getGauge(counterId2).get().getValue());
+ assertStateCount(namespace1, INITIALIZING_BLUE, 1);
+ assertStateCount(namespace2, INITIALIZING_BLUE, 1);
+
+ // Move deployment1 to different state
+ deployment1.getStatus().setBlueGreenState(ACTIVE_BLUE);
+ metricManager.onUpdate(deployment1);
+
+ assertStateCount(namespace1, INITIALIZING_BLUE, 0);
+ assertStateCount(namespace1, ACTIVE_BLUE, 1);
+ // namespace2 should be unaffected
+ assertStateCount(namespace2, INITIALIZING_BLUE, 1);
+
+ metricManager.onRemove(deployment1);
+ metricManager.onRemove(deployment2);
+
+ assertEquals(0, listener.getGauge(counterId1).get().getValue());
+ assertEquals(0, listener.getGauge(counterId2).get().getValue());
+ }
+
+ @Test
+ public void testAllBlueGreenStatesHaveMetrics() {
+ var deployment = buildBlueGreenDeployment("test-deployment",
TEST_NAMESPACE);
+ metricManager.onUpdate(deployment);
+
+ // Verify each state has a gauge registered
+ for (FlinkBlueGreenDeploymentState state :
FlinkBlueGreenDeploymentState.values()) {
+ var stateId =
+ listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class,
+ TEST_NAMESPACE,
+ BG_STATE_GROUP_NAME,
+ state.name(),
+ COUNTER_NAME);
+ assertTrue(
+ listener.getGauge(stateId).isPresent(),
+ "Metric should exist for state: " + state);
+ }
+ }
+
+ @Test
+ public void testFullLifecycleStateCountUpdates() {
+ var deployment = buildBlueGreenDeployment("test", TEST_NAMESPACE);
+
+ // Start in INITIALIZING_BLUE
+ metricManager.onUpdate(deployment);
+ assertStateCount(TEST_NAMESPACE, INITIALIZING_BLUE, 1);
+ assertStateCount(TEST_NAMESPACE, ACTIVE_BLUE, 0);
+
+ // Transition to ACTIVE_BLUE
+ deployment.getStatus().setBlueGreenState(ACTIVE_BLUE);
+ metricManager.onUpdate(deployment);
+ assertStateCount(TEST_NAMESPACE, INITIALIZING_BLUE, 0);
+ assertStateCount(TEST_NAMESPACE, ACTIVE_BLUE, 1);
+
+ // Transition to TRANSITIONING_TO_GREEN
+ deployment.getStatus().setBlueGreenState(TRANSITIONING_TO_GREEN);
+ metricManager.onUpdate(deployment);
+ assertStateCount(TEST_NAMESPACE, ACTIVE_BLUE, 0);
+ assertStateCount(TEST_NAMESPACE, TRANSITIONING_TO_GREEN, 1);
+
+ // Transition to ACTIVE_GREEN
+ deployment.getStatus().setBlueGreenState(ACTIVE_GREEN);
+ metricManager.onUpdate(deployment);
+ assertStateCount(TEST_NAMESPACE, TRANSITIONING_TO_GREEN, 0);
+ assertStateCount(TEST_NAMESPACE, ACTIVE_GREEN, 1);
+ }
+
+ @Test
+ public void testMetricsDisabled() {
+ var conf = new Configuration();
+ conf.set(OPERATOR_RESOURCE_METRICS_ENABLED, false);
+ var disabledListener = new TestingMetricListener(conf);
+ var disabledMetricManager =
+ MetricManager.createFlinkBlueGreenDeploymentMetricManager(
+ conf, disabledListener.getMetricGroup());
+
+ var deployment = buildBlueGreenDeployment("test", TEST_NAMESPACE);
+
+ var counterId =
+ disabledListener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class, TEST_NAMESPACE,
COUNTER_NAME);
+
+ disabledMetricManager.onUpdate(deployment);
+ assertTrue(disabledListener.getGauge(counterId).isEmpty());
+
+ for (FlinkBlueGreenDeploymentState state :
FlinkBlueGreenDeploymentState.values()) {
+ var statusId =
+ disabledListener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class,
+ TEST_NAMESPACE,
+ BG_STATE_GROUP_NAME,
+ state.name(),
+ COUNTER_NAME);
+ assertTrue(disabledListener.getGauge(statusId).isEmpty());
+ }
+ }
+
+ @Test
+ public void testRepeatedUpdatesDoNotDuplicateCount() {
+ var deployment = buildBlueGreenDeployment("test", TEST_NAMESPACE);
+
+ // Multiple updates in same state should not increase count
+ metricManager.onUpdate(deployment);
+ metricManager.onUpdate(deployment);
+ metricManager.onUpdate(deployment);
+
+ var counterId =
+ listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class, TEST_NAMESPACE,
COUNTER_NAME);
+ assertEquals(1, listener.getGauge(counterId).get().getValue());
+ assertStateCount(TEST_NAMESPACE, INITIALIZING_BLUE, 1);
+ }
+
+ @Test
+ public void testJobStatusGaugeTracking() {
+ var deployment1 = buildBlueGreenDeployment("deployment1",
TEST_NAMESPACE);
+ var deployment2 = buildBlueGreenDeployment("deployment2",
TEST_NAMESPACE);
+
+ var runningId =
+ listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class,
+ TEST_NAMESPACE,
+ JOB_STATUS_GROUP_NAME,
+ JobStatus.RUNNING.name(),
+ COUNTER_NAME);
+ var failingId =
+ listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class,
+ TEST_NAMESPACE,
+ JOB_STATUS_GROUP_NAME,
+ JobStatus.FAILING.name(),
+ COUNTER_NAME);
+
+ assertTrue(listener.getGauge(runningId).isEmpty());
+ assertTrue(listener.getGauge(failingId).isEmpty());
+
+ // Both start with RUNNING
+ deployment1.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+ deployment2.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+ metricManager.onUpdate(deployment1);
+ metricManager.onUpdate(deployment2);
+ assertJobStatusCount(TEST_NAMESPACE, JobStatus.RUNNING, 2);
+ assertJobStatusCount(TEST_NAMESPACE, JobStatus.FAILING, 0);
+
+ // deployment1 transitions to FAILING
+ deployment1.getStatus().getJobStatus().setState(JobStatus.FAILING);
+ metricManager.onUpdate(deployment1);
+ assertJobStatusCount(TEST_NAMESPACE, JobStatus.RUNNING, 1);
+ assertJobStatusCount(TEST_NAMESPACE, JobStatus.FAILING, 1);
+
+ // deployment2 also transitions to FAILING
+ deployment2.getStatus().getJobStatus().setState(JobStatus.FAILING);
+ metricManager.onUpdate(deployment2);
+ assertJobStatusCount(TEST_NAMESPACE, JobStatus.RUNNING, 0);
+ assertJobStatusCount(TEST_NAMESPACE, JobStatus.FAILING, 2);
+
+ // Remove deployment1
+ metricManager.onRemove(deployment1);
+ assertJobStatusCount(TEST_NAMESPACE, JobStatus.FAILING, 1);
+
+ // Remove deployment2
+ metricManager.onRemove(deployment2);
+ assertJobStatusCount(TEST_NAMESPACE, JobStatus.FAILING, 0);
+ }
+
+ @Test
+ public void testFailuresCounterIncrementsOnTransitionToFailing() {
+ var deployment = buildBlueGreenDeployment("test", TEST_NAMESPACE);
+
+ var failuresId =
+ listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class, TEST_NAMESPACE,
FAILURES_COUNTER_NAME);
+
+ assertTrue(listener.getCounter(failuresId).isEmpty());
+
+ // Start with RUNNING
+ deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+ metricManager.onUpdate(deployment);
+ assertEquals(0L, listener.getCounter(failuresId).get().getCount());
+
+ // First transition to FAILING - counter increments
+ deployment.getStatus().getJobStatus().setState(JobStatus.FAILING);
+ metricManager.onUpdate(deployment);
+ assertEquals(1L, listener.getCounter(failuresId).get().getCount());
+
+ // Stay in FAILING - counter does NOT increment
+ metricManager.onUpdate(deployment);
+ assertEquals(1L, listener.getCounter(failuresId).get().getCount());
+
+ // Recover to RUNNING - counter stays same (never decrements)
+ deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+ metricManager.onUpdate(deployment);
+ assertEquals(1L, listener.getCounter(failuresId).get().getCount());
+
+ // Second transition to FAILING - counter increments again
+ deployment.getStatus().getJobStatus().setState(JobStatus.FAILING);
+ metricManager.onUpdate(deployment);
+ assertEquals(2L, listener.getCounter(failuresId).get().getCount());
+ }
+
+ @Test
+ public void testFailuresCounterMultipleDeployments() {
+ var deployment1 = buildBlueGreenDeployment("deployment1",
TEST_NAMESPACE);
+ var deployment2 = buildBlueGreenDeployment("deployment2",
TEST_NAMESPACE);
+
+ var failuresId =
+ listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class, TEST_NAMESPACE,
FAILURES_COUNTER_NAME);
+
+ assertTrue(listener.getCounter(failuresId).isEmpty());
+
+ // Both start RUNNING
+ deployment1.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+ deployment2.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+ metricManager.onUpdate(deployment1);
+ metricManager.onUpdate(deployment2);
+ assertEquals(0L, listener.getCounter(failuresId).get().getCount());
+
+ // deployment1 fails
+ deployment1.getStatus().getJobStatus().setState(JobStatus.FAILING);
+ metricManager.onUpdate(deployment1);
+ assertEquals(1L, listener.getCounter(failuresId).get().getCount());
+
+ // deployment2 fails
+ deployment2.getStatus().getJobStatus().setState(JobStatus.FAILING);
+ metricManager.onUpdate(deployment2);
+ assertEquals(2L, listener.getCounter(failuresId).get().getCount());
+
+ // deployment1 recovers - counter stays 2
+ deployment1.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+ metricManager.onUpdate(deployment1);
+ assertEquals(2L, listener.getCounter(failuresId).get().getCount());
+
+ // Remove deployments - counter stays 2 (historical, never decrements)
+ metricManager.onRemove(deployment1);
+ assertEquals(2L, listener.getCounter(failuresId).get().getCount());
+ metricManager.onRemove(deployment2);
+ assertEquals(2L, listener.getCounter(failuresId).get().getCount());
+ }
+
+ @Test
+ public void testFailuresCounterIsolatedByNamespace() {
+ var namespace1 = "ns1";
+ var namespace2 = "ns2";
+ var deployment1 = buildBlueGreenDeployment("deployment", namespace1);
+ var deployment2 = buildBlueGreenDeployment("deployment", namespace2);
+
+ var failuresId1 =
+ listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class, namespace1,
FAILURES_COUNTER_NAME);
+ var failuresId2 =
+ listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class, namespace2,
FAILURES_COUNTER_NAME);
+
+ assertTrue(listener.getCounter(failuresId1).isEmpty());
+ assertTrue(listener.getCounter(failuresId2).isEmpty());
+
+ // Initialize both namespaces first with RUNNING deployments
+ deployment1.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+ deployment2.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+ metricManager.onUpdate(deployment1);
+ metricManager.onUpdate(deployment2);
+ assertEquals(0L, listener.getCounter(failuresId1).get().getCount());
+ assertEquals(0L, listener.getCounter(failuresId2).get().getCount());
+
+ // deployment1 in ns1 fails
+ deployment1.getStatus().getJobStatus().setState(JobStatus.FAILING);
+ metricManager.onUpdate(deployment1);
+
+ // Only ns1 counter increments
+ assertEquals(1L, listener.getCounter(failuresId1).get().getCount());
+ assertEquals(0L, listener.getCounter(failuresId2).get().getCount());
+
+ // deployment2 in ns2 fails
+ deployment2.getStatus().getJobStatus().setState(JobStatus.FAILING);
+ metricManager.onUpdate(deployment2);
+
+ // Counters are isolated
+ assertEquals(1L, listener.getCounter(failuresId1).get().getCount());
+ assertEquals(1L, listener.getCounter(failuresId2).get().getCount());
+ }
+
+ @Test
+ public void testAllJobStatusesHaveMetrics() {
+ var deployment = buildBlueGreenDeployment("test-deployment",
TEST_NAMESPACE);
+ deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+ metricManager.onUpdate(deployment);
+
+ // Verify each JobStatus has a gauge registered
+ for (JobStatus status : JobStatus.values()) {
+ var statusId =
+ listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class,
+ TEST_NAMESPACE,
+ JOB_STATUS_GROUP_NAME,
+ status.name(),
+ COUNTER_NAME);
+ assertTrue(
+ listener.getGauge(statusId).isPresent(),
+ "Metric should exist for JobStatus: " + status);
+ }
+ }
+
+ @Test
+ public void testJobStatusDoesNotDoubleCount() {
+ var deployment = buildBlueGreenDeployment("test", TEST_NAMESPACE);
+
+ // Start with RUNNING
+ deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+ metricManager.onUpdate(deployment);
+ assertJobStatusCount(TEST_NAMESPACE, JobStatus.RUNNING, 1);
+
+ // Multiple updates in same JobStatus should not duplicate
+ metricManager.onUpdate(deployment);
+ metricManager.onUpdate(deployment);
+ assertJobStatusCount(TEST_NAMESPACE, JobStatus.RUNNING, 1);
+ }
+
+ private FlinkBlueGreenDeployment buildBlueGreenDeployment(String name,
String namespace) {
+ var deployment = new FlinkBlueGreenDeployment();
+ deployment.setMetadata(
+ new ObjectMetaBuilder()
+ .withName(name)
+ .withNamespace(namespace)
+ .withUid(UUID.randomUUID().toString())
+ .withCreationTimestamp(Instant.now().toString())
+ .build());
+
+ var flinkDeploymentSpec =
+ FlinkDeploymentSpec.builder()
+ .flinkConfiguration(new ConfigObjectNode())
+
.job(JobSpec.builder().upgradeMode(UpgradeMode.STATELESS).build())
+ .build();
+
+ var bgDeploymentSpec =
+ new FlinkBlueGreenDeploymentSpec(
+ new HashMap<>(),
+
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build());
+
+ deployment.setSpec(bgDeploymentSpec);
+
+ var status = new FlinkBlueGreenDeploymentStatus();
+ status.setBlueGreenState(INITIALIZING_BLUE);
+ status.setJobStatus(new
org.apache.flink.kubernetes.operator.api.status.JobStatus());
+ deployment.setStatus(status);
+
+ return deployment;
+ }
+
+ private void assertStateCount(
+ String namespace, FlinkBlueGreenDeploymentState state, int
expectedCount) {
+ var stateId =
+ listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class,
+ namespace,
+ BG_STATE_GROUP_NAME,
+ state.name(),
+ COUNTER_NAME);
+ assertEquals(
+ expectedCount,
+ listener.getGauge(stateId).get().getValue(),
+ "State count mismatch for " + state);
+ }
+
+ private void assertJobStatusCount(String namespace, JobStatus status, int
expectedCount) {
+ var statusId =
+ listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class,
+ namespace,
+ JOB_STATUS_GROUP_NAME,
+ status.name(),
+ COUNTER_NAME);
+ assertEquals(
+ expectedCount,
+ listener.getGauge(statusId).get().getValue(),
+ "JobStatus count mismatch for " + status);
+ }
+}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java
new file mode 100644
index 00000000..ce0af71c
--- /dev/null
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode;
+import
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import
org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
+import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
+import org.apache.flink.kubernetes.operator.metrics.TestingMetricListener;
+
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.UUID;
+
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
+import static
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED;
+import static
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.LIFECYCLE_GROUP_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.STATE_GROUP_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TIME_SECONDS_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_BLUE_TO_GREEN;
+import static
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_GREEN_TO_BLUE;
+import static
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_GROUP_NAME;
+import static
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_INITIAL_DEPLOYMENT;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link BlueGreenLifecycleMetrics}. */
+public class BlueGreenLifecycleMetricsTest {
+
+ private static final String TEST_NAMESPACE = "test-namespace";
+ private static final String[] TRANSITIONS = {
+ TRANSITION_INITIAL_DEPLOYMENT, TRANSITION_BLUE_TO_GREEN,
TRANSITION_GREEN_TO_BLUE
+ };
+
+ private final Configuration configuration = new Configuration();
+ private TestingMetricListener listener;
+ private MetricManager<FlinkBlueGreenDeployment> metricManager;
+
+ @BeforeEach
+ public void init() {
+ listener = new TestingMetricListener(configuration);
+ metricManager =
+ MetricManager.createFlinkBlueGreenDeploymentMetricManager(
+ configuration, listener.getMetricGroup());
+ }
+
+ @Test
+ public void testNamespaceHistogramMetricsExist() {
+ var deployment = buildBlueGreenDeployment("test-deployment",
TEST_NAMESPACE);
+ metricManager.onUpdate(deployment);
+
+ for (String transition : TRANSITIONS) {
+ assertTrue(
+ listener.getHistogram(
+ getNamespaceHistogramId(
+ TEST_NAMESPACE,
TRANSITION_GROUP_NAME, transition))
+ .isPresent(),
+ "Transition histogram should exist for: " + transition);
+ }
+
+ for (FlinkBlueGreenDeploymentState state :
FlinkBlueGreenDeploymentState.values()) {
+ assertTrue(
+ listener.getHistogram(
+ getNamespaceHistogramId(
+ TEST_NAMESPACE, STATE_GROUP_NAME,
state.name()))
+ .isPresent(),
+ "State time histogram should exist for: " + state);
+ }
+ }
+
+ @Test
+ public void testSystemLevelHistogramsExist() {
+ var deployment = buildBlueGreenDeployment("test-deployment",
TEST_NAMESPACE);
+ metricManager.onUpdate(deployment);
+
+ for (String transition : TRANSITIONS) {
+ assertTrue(
+ listener.getHistogram(
+ getSystemLevelHistogramId(
+ listener, TRANSITION_GROUP_NAME,
transition))
+ .isPresent(),
+ "System-level transition histogram should exist for: " +
transition);
+ }
+
+ for (FlinkBlueGreenDeploymentState state :
FlinkBlueGreenDeploymentState.values()) {
+ assertTrue(
+ listener.getHistogram(
+ getSystemLevelHistogramId(
+ listener, STATE_GROUP_NAME,
state.name()))
+ .isPresent(),
+ "System-level state time histogram should exist for: " +
state);
+ }
+ }
+
+ @Test
+ public void testMultiNamespaceHistogramIsolation() {
+ var namespace1 = "ns1";
+ var namespace2 = "ns2";
+ var ns1HistoId =
+ getNamespaceHistogramId(
+ namespace1, TRANSITION_GROUP_NAME,
TRANSITION_INITIAL_DEPLOYMENT);
+
+ var dep1 = buildBlueGreenDeployment("dep1", namespace1);
+ metricManager.onUpdate(dep1);
+ var ns1Histo = listener.getHistogram(ns1HistoId).get();
+
+ var dep2 = buildBlueGreenDeployment("dep2", namespace1);
+ metricManager.onUpdate(dep2);
+ assertTrue(
+ ns1Histo == listener.getHistogram(ns1HistoId).get(),
+ "Deployments in same namespace should share histogram
instance");
+
+ var dep3 = buildBlueGreenDeployment("dep3", namespace2);
+ metricManager.onUpdate(dep3);
+ var ns2HistoId =
+ getNamespaceHistogramId(
+ namespace2, TRANSITION_GROUP_NAME,
TRANSITION_INITIAL_DEPLOYMENT);
+ assertTrue(
+ ns1Histo != listener.getHistogram(ns2HistoId).get(),
+ "Different namespaces should have different histogram
instances");
+ }
+
+ @Test
+ public void testLifecycleMetricsDisabled() {
+ var conf = new Configuration();
+ conf.set(OPERATOR_LIFECYCLE_METRICS_ENABLED, false);
+ var disabledMetricManager =
+ MetricManager.createFlinkBlueGreenDeploymentMetricManager(
+ conf, new
TestingMetricListener(conf).getMetricGroup());
+
+ assertNull(
+ getBlueGreenLifecycleMetrics(disabledMetricManager),
+ "BlueGreenLifecycleMetrics should not be registered when
disabled");
+ }
+
+ private FlinkBlueGreenDeployment buildBlueGreenDeployment(String name,
String namespace) {
+ var deployment = new FlinkBlueGreenDeployment();
+ deployment.setMetadata(
+ new ObjectMetaBuilder()
+ .withName(name)
+ .withNamespace(namespace)
+ .withUid(UUID.randomUUID().toString())
+ .withCreationTimestamp(Instant.now().toString())
+ .build());
+
+ var flinkDeploymentSpec =
+ FlinkDeploymentSpec.builder()
+ .flinkConfiguration(new ConfigObjectNode())
+
.job(JobSpec.builder().upgradeMode(UpgradeMode.STATELESS).build())
+ .build();
+
+ var bgDeploymentSpec =
+ new FlinkBlueGreenDeploymentSpec(
+ new HashMap<>(),
+
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build());
+
+ deployment.setSpec(bgDeploymentSpec);
+
+ var status = new FlinkBlueGreenDeploymentStatus();
+ status.setBlueGreenState(INITIALIZING_BLUE);
+ deployment.setStatus(status);
+
+ return deployment;
+ }
+
+ private String getNamespaceHistogramId(String namespace, String groupName,
String metricName) {
+ return listener.getNamespaceMetricId(
+ FlinkBlueGreenDeployment.class,
+ namespace,
+ LIFECYCLE_GROUP_NAME,
+ groupName,
+ metricName,
+ TIME_SECONDS_NAME);
+ }
+
+ private String getSystemLevelHistogramId(
+ TestingMetricListener metricListener, String groupName, String
metricName) {
+ return metricListener.getMetricId(
+ String.format(
+ "%s.%s.%s.%s.%s",
+ FlinkBlueGreenDeployment.class.getSimpleName(),
+ LIFECYCLE_GROUP_NAME,
+ groupName,
+ metricName,
+ TIME_SECONDS_NAME));
+ }
+
+ private static BlueGreenLifecycleMetrics getBlueGreenLifecycleMetrics(
+ MetricManager<FlinkBlueGreenDeployment> metricManager) {
+ for (CustomResourceMetrics<?> metrics :
metricManager.getRegisteredMetrics()) {
+ if (metrics instanceof BlueGreenLifecycleMetrics) {
+ return (BlueGreenLifecycleMetrics) metrics;
+ }
+ }
+ return null;
+ }
+}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenResourceLifecycleMetricTrackerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenResourceLifecycleMetricTrackerTest.java
new file mode 100644
index 00000000..cbc80eff
--- /dev/null
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenResourceLifecycleMetricTrackerTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+import org.apache.flink.metrics.Histogram;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_BLUE;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_GREEN;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.SAVEPOINTING_BLUE;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.SAVEPOINTING_GREEN;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN;
+import static
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_BLUE_TO_GREEN;
+import static
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_GREEN_TO_BLUE;
+import static
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_INITIAL_DEPLOYMENT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for {@link BlueGreenResourceLifecycleMetricTracker}. */
+public class BlueGreenResourceLifecycleMetricTrackerTest {
+
+ private Map<String, List<Histogram>> transitionHistos;
+ private Map<FlinkBlueGreenDeploymentState, List<Histogram>>
stateTimeHistos;
+
+ @BeforeEach
+ void setUp() {
+ transitionHistos = new ConcurrentHashMap<>();
+ transitionHistos.put(TRANSITION_INITIAL_DEPLOYMENT,
List.of(createHistogram()));
+ transitionHistos.put(TRANSITION_BLUE_TO_GREEN,
List.of(createHistogram()));
+ transitionHistos.put(TRANSITION_GREEN_TO_BLUE,
List.of(createHistogram()));
+
+ stateTimeHistos = new ConcurrentHashMap<>();
+ for (FlinkBlueGreenDeploymentState state :
FlinkBlueGreenDeploymentState.values()) {
+ stateTimeHistos.put(state, List.of(createHistogram()));
+ }
+ }
+
+ @Test
+ void testInitialDeploymentRecordsTransitionTime() {
+ long ts = 0;
+ var tracker = createTracker(INITIALIZING_BLUE,
Instant.ofEpochSecond(ts));
+ tracker.onUpdate(TRANSITIONING_TO_BLUE, Instant.ofEpochSecond(ts +=
5));
+ tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 5));
+
+ assertTransitionRecorded(TRANSITION_INITIAL_DEPLOYMENT, 5);
+ }
+
+ @Test
+ void testInitialDeploymentRecordsStateTime() {
+ long ts = 0;
+ var tracker = createTracker(INITIALIZING_BLUE,
Instant.ofEpochSecond(ts));
+ tracker.onUpdate(TRANSITIONING_TO_BLUE, Instant.ofEpochSecond(ts +=
2));
+ tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 5));
+
+ assertStateTimeRecorded(INITIALIZING_BLUE, 2);
+ assertStateTimeRecorded(TRANSITIONING_TO_BLUE, 5);
+ }
+
+ @Test
+ void testBlueToGreenRecordsTransitionTime() {
+ long ts = 0;
+ var tracker = createTracker(ACTIVE_BLUE, Instant.ofEpochSecond(ts));
+ tracker.onUpdate(SAVEPOINTING_BLUE, Instant.ofEpochSecond(ts += 5));
+ tracker.onUpdate(TRANSITIONING_TO_GREEN, Instant.ofEpochSecond(ts +=
10));
+ tracker.onUpdate(ACTIVE_GREEN, Instant.ofEpochSecond(ts += 5));
+
+ assertTransitionRecorded(TRANSITION_BLUE_TO_GREEN, 15);
+ }
+
+ @Test
+ void testBlueToGreenRecordsAllIntermediateStateTimes() {
+ long ts = 0;
+ var tracker = createTracker(ACTIVE_BLUE, Instant.ofEpochSecond(ts));
+ tracker.onUpdate(SAVEPOINTING_BLUE, Instant.ofEpochSecond(ts += 5));
+ tracker.onUpdate(TRANSITIONING_TO_GREEN, Instant.ofEpochSecond(ts +=
10));
+ tracker.onUpdate(ACTIVE_GREEN, Instant.ofEpochSecond(ts += 3));
+
+ assertStateTimeRecorded(ACTIVE_BLUE, 5);
+ assertStateTimeRecorded(SAVEPOINTING_BLUE, 10);
+ assertStateTimeRecorded(TRANSITIONING_TO_GREEN, 3);
+ }
+
+ @Test
+ void testGreenToBlueRecordsTransitionTime() {
+ long ts = 0;
+ var tracker = createTracker(ACTIVE_GREEN, Instant.ofEpochSecond(ts));
+ tracker.onUpdate(SAVEPOINTING_GREEN, Instant.ofEpochSecond(ts += 5));
+ tracker.onUpdate(TRANSITIONING_TO_BLUE, Instant.ofEpochSecond(ts +=
8));
+ tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 2));
+
+ assertTransitionRecorded(TRANSITION_GREEN_TO_BLUE, 10);
+ }
+
+ @Test
+ void testGreenToBlueRecordsAllIntermediateStateTimes() {
+ long ts = 0;
+ var tracker = createTracker(ACTIVE_GREEN, Instant.ofEpochSecond(ts));
+ tracker.onUpdate(SAVEPOINTING_GREEN, Instant.ofEpochSecond(ts += 5));
+ tracker.onUpdate(TRANSITIONING_TO_BLUE, Instant.ofEpochSecond(ts +=
8));
+ tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 2));
+
+ assertStateTimeRecorded(ACTIVE_GREEN, 5);
+ assertStateTimeRecorded(SAVEPOINTING_GREEN, 8);
+ assertStateTimeRecorded(TRANSITIONING_TO_BLUE, 2);
+ }
+
+ @Test
+ void testSameStateUpdatesOnlyUpdateTimestamp() {
+ long ts = 0;
+ var tracker = createTracker(ACTIVE_BLUE, Instant.ofEpochSecond(ts));
+ tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 1));
+ tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 1));
+ tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 1));
+
+ assertNoTransitionRecorded(TRANSITION_BLUE_TO_GREEN);
+ assertNoTransitionRecorded(TRANSITION_GREEN_TO_BLUE);
+ assertNoStateTimeRecorded(ACTIVE_BLUE);
+ }
+
+ @Test
+ void testIntermediateStateMetricsOnlyRecordedAtStableState() {
+ long ts = 0;
+ var tracker = createTracker(ACTIVE_BLUE, Instant.ofEpochSecond(ts));
+ tracker.onUpdate(SAVEPOINTING_BLUE, Instant.ofEpochSecond(ts += 5));
+ tracker.onUpdate(TRANSITIONING_TO_GREEN, Instant.ofEpochSecond(ts +=
5));
+
+ assertNoStateTimeRecorded(ACTIVE_BLUE);
+ assertNoStateTimeRecorded(SAVEPOINTING_BLUE);
+
+ tracker.onUpdate(ACTIVE_GREEN, Instant.ofEpochSecond(ts += 5));
+
+ assertStateTimeRecorded(ACTIVE_BLUE, 5);
+ assertStateTimeRecorded(SAVEPOINTING_BLUE, 5);
+ assertStateTimeRecorded(TRANSITIONING_TO_GREEN, 5);
+ }
+
+ @Test
+ void testRecoveryFromActiveStateTracksNextTransition() {
+ long ts = 0;
+ var tracker = createTracker(ACTIVE_BLUE, Instant.ofEpochSecond(ts));
+ tracker.onUpdate(TRANSITIONING_TO_GREEN, Instant.ofEpochSecond(ts +=
10));
+ tracker.onUpdate(ACTIVE_GREEN, Instant.ofEpochSecond(ts += 5));
+
+ assertTransitionRecorded(TRANSITION_BLUE_TO_GREEN, 5);
+ }
+
+ @Test
+ void testConsecutiveTransitionsEachTrackedIndependently() {
+ long ts = 0;
+ var tracker = createTracker(ACTIVE_BLUE, Instant.ofEpochSecond(ts));
+
+ tracker.onUpdate(TRANSITIONING_TO_GREEN, Instant.ofEpochSecond(ts +=
10));
+ tracker.onUpdate(ACTIVE_GREEN, Instant.ofEpochSecond(ts += 5));
+
+ assertTransitionRecorded(TRANSITION_BLUE_TO_GREEN, 5);
+
+ tracker.onUpdate(TRANSITIONING_TO_BLUE, Instant.ofEpochSecond(ts +=
8));
+ tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 2));
+
+ assertTransitionRecorded(TRANSITION_GREEN_TO_BLUE, 2);
+ }
+
+ @Test
+ void testFailedBlueToGreenRollbackRecordsStateTimes() {
+ long ts = 0;
+ var tracker = createTracker(ACTIVE_BLUE, Instant.ofEpochSecond(ts));
+
+ tracker.onUpdate(SAVEPOINTING_BLUE, Instant.ofEpochSecond(ts += 10));
+ tracker.onUpdate(TRANSITIONING_TO_GREEN, Instant.ofEpochSecond(ts +=
15));
+ tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 5));
+
+ assertStateTimeRecorded(ACTIVE_BLUE, 10);
+ assertStateTimeRecorded(SAVEPOINTING_BLUE, 15);
+ assertStateTimeRecorded(TRANSITIONING_TO_GREEN, 5);
+ }
+
+ @Test
+ void testFailedGreenToBlueRollbackRecordsStateTimes() {
+ long ts = 0;
+ var tracker = createTracker(ACTIVE_GREEN, Instant.ofEpochSecond(ts));
+
+ tracker.onUpdate(SAVEPOINTING_GREEN, Instant.ofEpochSecond(ts += 8));
+ tracker.onUpdate(TRANSITIONING_TO_BLUE, Instant.ofEpochSecond(ts +=
12));
+ tracker.onUpdate(ACTIVE_GREEN, Instant.ofEpochSecond(ts += 3));
+
+ assertStateTimeRecorded(ACTIVE_GREEN, 8);
+ assertStateTimeRecorded(SAVEPOINTING_GREEN, 12);
+ assertStateTimeRecorded(TRANSITIONING_TO_BLUE, 3);
+ }
+
+ private BlueGreenResourceLifecycleMetricTracker createTracker(
+ FlinkBlueGreenDeploymentState initialState, Instant initialTime) {
+ return new BlueGreenResourceLifecycleMetricTracker(
+ initialState, initialTime, transitionHistos, stateTimeHistos);
+ }
+
+ private Histogram createHistogram() {
+ return OperatorMetricUtils.createHistogram(
+ FlinkOperatorConfiguration.fromConfiguration(new
Configuration()));
+ }
+
+ private void assertTransitionRecorded(String transitionName, long
expectedSeconds) {
+ var stats =
transitionHistos.get(transitionName).get(0).getStatistics();
+ assertEquals(1, stats.size(), transitionName + " should have 1
sample");
+ assertEquals(expectedSeconds, (long) stats.getMean(), transitionName +
" duration");
+ }
+
+ private void assertNoTransitionRecorded(String transitionName) {
+ var stats =
transitionHistos.get(transitionName).get(0).getStatistics();
+ assertEquals(0, stats.size(), transitionName + " should have no
samples");
+ }
+
+ private void assertStateTimeRecorded(
+ FlinkBlueGreenDeploymentState state, long expectedSeconds) {
+ var stats = stateTimeHistos.get(state).get(0).getStatistics();
+ assertEquals(1, stats.size(), state + " should have 1 sample");
+ assertEquals(expectedSeconds, (long) stats.getMean(), state + "
duration");
+ }
+
+ private void assertNoStateTimeRecorded(FlinkBlueGreenDeploymentState
state) {
+ var stats = stateTimeHistos.get(state).get(0).getStatistics();
+ assertEquals(0, stats.size(), state + " should have no samples");
+ }
+}