This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b895e75 [FLINK-38787] Introduce FlinkBlueGreenDeployment Metrics
6b895e75 is described below

commit 6b895e754a872b86c3298fc6b61c88f996216b17
Author: James Kan <[email protected]>
AuthorDate: Sun Jan 18 11:22:51 2026 -0800

    [FLINK-38787] Introduce FlinkBlueGreenDeployment Metrics
---
 docs/content/docs/operations/metrics-logging.md    |  45 +-
 .../flink/kubernetes/operator/FlinkOperator.java   |   7 +-
 .../FlinkBlueGreenDeploymentController.java        |  22 +-
 .../metrics/FlinkBlueGreenDeploymentMetrics.java   | 188 ++++++++
 .../kubernetes/operator/metrics/MetricManager.java |  30 ++
 .../lifecycle/BlueGreenLifecycleMetrics.java       | 197 ++++++++
 .../BlueGreenResourceLifecycleMetricTracker.java   | 163 +++++++
 .../kubernetes/operator/utils/StatusRecorder.java  |  23 +
 .../FlinkBlueGreenDeploymentControllerTest.java    |   3 +
 .../TestingFlinkBlueGreenDeploymentController.java |   8 +-
 .../FlinkBlueGreenDeploymentMetricsTest.java       | 509 +++++++++++++++++++++
 .../lifecycle/BlueGreenLifecycleMetricsTest.java   | 224 +++++++++
 ...lueGreenResourceLifecycleMetricTrackerTest.java | 251 ++++++++++
 13 files changed, 1656 insertions(+), 14 deletions(-)

diff --git a/docs/content/docs/operations/metrics-logging.md 
b/docs/content/docs/operations/metrics-logging.md
index d1e194ef..ac21f564 100644
--- a/docs/content/docs/operations/metrics-logging.md
+++ b/docs/content/docs/operations/metrics-logging.md
@@ -35,15 +35,20 @@ Different operator metrics can be turned on/off 
individually using the configura
 ### Flink Resource Metrics
 The Operator gathers aggregates metrics about managed resources.
 
-| Scope              | Metrics                                                 
                                  | Description                                 
                                                                                
                                                                       | Type   
   |
-|--------------------|-------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|
+| Scope              | Metrics                                                 
                                  | Description                                 
                                                                                
                                                                      | Type    
  |
+|--------------------|-------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|
 | Namespace          | FlinkDeployment/FlinkSessionJob.Count                   
                                  | Number of managed resources per namespace   
                                                                                
                                            | Gauge     |
-| Namespace          | FlinkDeployment.ResourceUsage.Cpu/Memory                
                            | Total resources used per namespace                
                                                                                
                                                                | Gauge     |
-| Namespace          | FlinkDeployment.JmDeploymentStatus.&lt;Status&gt;.Count 
                                  | Number of managed FlinkDeployment resources 
per &lt;Status&gt; per namespace. &lt;Status&gt; can take values from: READY, 
DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR                            | 
Gauge     |
-| Namespace          | FlinkDeployment.FlinkVersion.&lt;FlinkVersion&gt;.Count 
                              | Number of managed FlinkDeployment resources per 
&lt;FlinkVersion&gt; per namespace. &lt;FlinkVersion&gt; is retrieved via REST 
API from Flink JM.                                                  | Gauge     
|
+| Namespace          | FlinkDeployment.ResourceUsage.Cpu/Memory                
                            | Total resources used per namespace                
                                                                                
                                                               | Gauge     |
+| Namespace          | FlinkDeployment.JmDeploymentStatus.&lt;Status&gt;.Count 
                                  | Number of managed FlinkDeployment resources 
per &lt;Status&gt; per namespace. &lt;Status&gt; can take values from: READY, 
DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR                           | Gauge 
    |
+| Namespace          | FlinkDeployment.FlinkVersion.&lt;FlinkVersion&gt;.Count 
                              | Number of managed FlinkDeployment resources per 
&lt;FlinkVersion&gt; per namespace. &lt;FlinkVersion&gt; is retrieved via REST 
API from Flink JM.                                                 | Gauge     |
 | Namespace          | 
FlinkDeployment/FlinkSessionJob.Lifecycle.State.&lt;State&gt;.Count             
          | Number of managed resources currently in state &lt;State&gt; per 
namespace. &lt;State&gt; can take values from: CREATED, SUSPENDED, UPGRADING, 
DEPLOYED, STABLE, ROLLING_BACK, ROLLED_BACK, FAILED | Gauge     |
-| System/Namespace   | 
FlinkDeployment/FlinkSessionJob.Lifecycle.State.&lt;State&gt;.TimeSeconds       
          | Time spent in state &lt;State&gt; for a given resource. 
&lt;State&gt; can take values from: CREATED, SUSPENDED, UPGRADING, DEPLOYED, 
STABLE, ROLLING_BACK, ROLLED_BACK, FAILED                      | Histogram |
-| System/Namespace   | 
FlinkDeployment/FlinkSessionJob.Lifecycle.Transition.&lt;Transition&gt;.TimeSeconds
       | Time statistics for selected lifecycle state transitions. 
&lt;Transition&gt; can take values from: Resume, Upgrade, Suspend, 
Stabilization, Rollback, Submission                                   | 
Histogram |
+| System/Namespace   | 
FlinkDeployment/FlinkSessionJob.Lifecycle.State.&lt;State&gt;.TimeSeconds       
          | Time spent in state &lt;State&gt; for a given resource. 
&lt;State&gt; can take values from: CREATED, SUSPENDED, UPGRADING, DEPLOYED, 
STABLE, ROLLING_BACK, ROLLED_BACK, FAILED                     | Histogram |
+| System/Namespace   | 
FlinkDeployment/FlinkSessionJob.Lifecycle.Transition.&lt;Transition&gt;.TimeSeconds
       | Time statistics for selected lifecycle state transitions. 
&lt;Transition&gt; can take values from: Resume, Upgrade, Suspend, 
Stabilization, Rollback, Submission                                  | 
Histogram |
+| Namespace          | 
FlinkBlueGreenDeployment.BlueGreenState.&lt;State&gt;.Count                     
         | Number of managed FlinkBlueGreenDeployment resources currently in 
state &lt;State&gt; per namespace. &lt;State&gt; can take values from: 
INITIALIZING_BLUE, ACTIVE_BLUE, SAVEPOINTING_BLUE, TRANSITIONING_TO_GREEN, 
ACTIVE_GREEN, SAVEPOINTING_GREEN, TRANSITIONING_TO_BLUE | Gauge     |
+| Namespace          | FlinkBlueGreenDeployment.JobStatus.&lt;Status&gt;.Count 
                                 | Number of managed FlinkBlueGreenDeployment 
resources currently in JobStatus &lt;Status&gt; per namespace. &lt;Status&gt; 
can take values from: RUNNING, FAILING, SUSPENDED, FAILED, RECONCILING | Gauge  
   |
+| Namespace          | FlinkBlueGreenDeployment.Failures                       
                                 | Historical count of failure events 
(transitions to FAILING state) for all FlinkBlueGreenDeployment resources in 
the namespace. Counter increments on each transition to FAILING and never 
decrements. | Counter   |
+| System/Namespace   | 
FlinkBlueGreenDeployment.Lifecycle.State.&lt;State&gt;.TimeSeconds              
          | Time spent in state &lt;State&gt; for a given 
FlinkBlueGreenDeployment resource. &lt;State&gt; values same as above.          
                                                                     | 
Histogram |
+| System/Namespace   | 
FlinkBlueGreenDeployment.Lifecycle.Transition.&lt;Transition&gt;.TimeSeconds    
          | Time statistics for blue-green lifecycle state transitions. 
&lt;Transition&gt; can take values from: InitialDeployment, BlueToGreen, 
GreenToBlue                                                  | Histogram |
 
 #### Lifecycle metrics
 
@@ -60,6 +65,32 @@ In addition to the simple counts we further track a few 
selected state transitio
  - Rollback : Time from deployed to rolled_back state if the resource was 
rolled back
  - Submission: Flink resource submission time
 
+#### FlinkBlueGreenDeployment Lifecycle metrics
+
+FlinkBlueGreenDeployment resources have their own lifecycle states that track 
the blue-green deployment process. The operator monitors the following 
transitions:
+
+ - InitialDeployment : Time from leaving INITIALIZING_BLUE to reaching 
ACTIVE_BLUE (first deployment)
+ - BlueToGreen : Time from leaving ACTIVE_BLUE to reaching ACTIVE_GREEN 
(actual transition duration)
+ - GreenToBlue : Time from leaving ACTIVE_GREEN to reaching ACTIVE_BLUE 
(actual transition duration)
+
+Transition metrics measure the actual transition time - from when the 
deployment leaves the source stable state until it reaches the target stable 
state. This excludes time spent running stably before the transition was 
initiated.
+
+State time metrics track how long a resource spends in each state 
(ACTIVE_BLUE, SAVEPOINTING_BLUE, TRANSITIONING_TO_GREEN, etc.), which helps 
identify bottlenecks in the deployment pipeline.
+
+#### FlinkBlueGreenDeployment JobStatus Tracking
+
+In addition to BlueGreenState tracking, FlinkBlueGreenDeployment resources 
also expose JobStatus metrics that track the Flink job state:
+
+**JobStatus Gauges**: Current count of deployments per JobStatus (RUNNING, 
FAILING, SUSPENDED, etc.) - these gauges go up and down as deployments 
transition between states.
+
+**Failures Counter**: Historical count that increments each time a deployment 
transitions TO the FAILING state. This counter:
+ - Never decrements (accumulates total failures since operator start)
+ - Increments on each new transition to FAILING (even if the same deployment 
fails multiple times)
+ - Persists across deployment recoveries (provides historical failure tracking)
+ - Useful for calculating failure rates and setting up alerts
+
+Example: A deployment goes RUNNING → FAILING → RUNNING → FAILING. The FAILING 
gauge shows 0 or 1 (current state), while the Failures counter shows 2 
(historical events).
+
 ### Kubernetes Client Metrics
 
 The Operator gathers various metrics related to Kubernetes API server access.
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 96473ddb..fdc856f2 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -257,7 +257,12 @@ public class FlinkOperator {
 
     @VisibleForTesting
     void registerBlueGreenController() {
-        var controller = new FlinkBlueGreenDeploymentController(ctxFactory);
+        var metricManager =
+                
MetricManager.createFlinkBlueGreenDeploymentMetricManager(baseConfig, 
metricGroup);
+        var statusRecorder =
+                StatusRecorder.createForFlinkBlueGreenDeployment(client, 
metricManager, listeners);
+        var controller = new FlinkBlueGreenDeploymentController(ctxFactory, 
statusRecorder);
+
         registeredControllers.add(operator.register(controller, 
this::overrideControllerConfigs));
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
index a35ccb2b..bac6f131 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploy
 import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenStateHandlerRegistry;
 import 
org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.BlueGreenStateHandler;
 import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import 
io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -70,10 +71,16 @@ public class FlinkBlueGreenDeploymentController implements 
Reconciler<FlinkBlueG
 
     private final FlinkResourceContextFactory ctxFactory;
     private final BlueGreenStateHandlerRegistry handlerRegistry;
+    private final StatusRecorder<FlinkBlueGreenDeployment, 
FlinkBlueGreenDeploymentStatus>
+            statusRecorder;
 
-    public FlinkBlueGreenDeploymentController(FlinkResourceContextFactory 
ctxFactory) {
+    public FlinkBlueGreenDeploymentController(
+            FlinkResourceContextFactory ctxFactory,
+            StatusRecorder<FlinkBlueGreenDeployment, 
FlinkBlueGreenDeploymentStatus>
+                    statusRecorder) {
         this.ctxFactory = ctxFactory;
         this.handlerRegistry = new BlueGreenStateHandlerRegistry();
+        this.statusRecorder = statusRecorder;
     }
 
     @Override
@@ -110,9 +117,12 @@ public class FlinkBlueGreenDeploymentController implements 
Reconciler<FlinkBlueG
                             josdkContext,
                             null,
                             ctxFactory);
-            return BlueGreenDeploymentService.patchStatusUpdateControl(
-                            context, INITIALIZING_BLUE, null, null)
-                    .rescheduleAfter(0);
+            UpdateControl<FlinkBlueGreenDeployment> updateControl =
+                    BlueGreenDeploymentService.patchStatusUpdateControl(
+                                    context, INITIALIZING_BLUE, null, null)
+                            .rescheduleAfter(0);
+            statusRecorder.patchAndCacheStatus(bgDeployment, 
josdkContext.getClient());
+            return updateControl;
         } else {
             FlinkBlueGreenDeploymentState currentState = 
deploymentStatus.getBlueGreenState();
             var context =
@@ -132,7 +142,9 @@ public class FlinkBlueGreenDeploymentController implements 
Reconciler<FlinkBlueG
                     context.getDeploymentName());
 
             BlueGreenStateHandler handler = 
handlerRegistry.getHandler(currentState);
-            return handler.handle(context);
+            UpdateControl<FlinkBlueGreenDeployment> updateControl = 
handler.handle(context);
+            statusRecorder.patchAndCacheStatus(bgDeployment, 
josdkContext.getClient());
+            return updateControl;
         }
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetrics.java
new file mode 100644
index 00000000..4862cb08
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetrics.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Metrics for FlinkBlueGreenDeployment resources. */
+public class FlinkBlueGreenDeploymentMetrics
+        implements CustomResourceMetrics<FlinkBlueGreenDeployment> {
+
+    public static final String BG_STATE_GROUP_NAME = "BlueGreenState";
+    public static final String JOB_STATUS_GROUP_NAME = "JobStatus";
+    public static final String COUNTER_NAME = "Count";
+    public static final String FAILURES_COUNTER_NAME = "Failures";
+
+    private final KubernetesOperatorMetricGroup parentMetricGroup;
+    private final Configuration configuration;
+
+    // Tracks which deployments are in which state per namespace (for gauge 
metrics)
+    // Map: namespace -> state -> set of deployment names
+    private final Map<String, Map<FlinkBlueGreenDeploymentState, Set<String>>> 
deploymentStatuses =
+            new ConcurrentHashMap<>();
+
+    // Tracks which deployments are in which JobStatus per namespace (for 
gauge metrics)
+    // Map: namespace -> JobStatus -> set of deployment names
+    private final Map<String, Map<JobStatus, Set<String>>> jobStatuses = new 
ConcurrentHashMap<>();
+
+    // Failure counters per namespace (historical count, never decrements)
+    // Map: namespace -> Counter
+    private final Map<String, Counter> failureCounters = new 
ConcurrentHashMap<>();
+
+    public FlinkBlueGreenDeploymentMetrics(
+            KubernetesOperatorMetricGroup parentMetricGroup, Configuration 
configuration) {
+        this.parentMetricGroup = parentMetricGroup;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void onUpdate(FlinkBlueGreenDeployment flinkBgDep) {
+        var namespace = flinkBgDep.getMetadata().getNamespace();
+        var deploymentName = flinkBgDep.getMetadata().getName();
+        var state = flinkBgDep.getStatus().getBlueGreenState();
+
+        // Get current JobStatus
+        var jobStatusObj = flinkBgDep.getStatus().getJobStatus();
+        var currentJobStatus = jobStatusObj != null ? jobStatusObj.getState() 
: null;
+
+        // Check if was in FAILING state BEFORE
+        boolean wasInFailing = false;
+        if (currentJobStatus != null) {
+            var namespaceJobStatuses = jobStatuses.get(namespace);
+            if (namespaceJobStatuses != null) {
+                var failingSet = namespaceJobStatuses.get(JobStatus.FAILING);
+                wasInFailing = failingSet != null && 
failingSet.contains(deploymentName);
+            }
+        }
+
+        // Clear from all tracking (BlueGreenState and JobStatus)
+        clearStateCount(flinkBgDep);
+
+        deploymentStatuses
+                .computeIfAbsent(namespace, this::initNamespaceMetrics)
+                .get(state)
+                .add(deploymentName);
+
+        // Track JobStatus in gauge
+        if (currentJobStatus != null) {
+            jobStatuses
+                    .computeIfAbsent(namespace, ns -> createJobStatusMap())
+                    .get(currentJobStatus)
+                    .add(deploymentName);
+
+            // Detect transition TO FAILING for counter
+            if (currentJobStatus == JobStatus.FAILING && !wasInFailing) {
+                var counter = failureCounters.get(namespace);
+                if (counter != null) {
+                    counter.inc();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onRemove(FlinkBlueGreenDeployment flinkBgDep) {
+        clearStateCount(flinkBgDep);
+    }
+
+    /** Clears the deployment from all state count sets (used before updating 
to new state). */
+    private void clearStateCount(FlinkBlueGreenDeployment flinkBgDep) {
+        var namespace = flinkBgDep.getMetadata().getNamespace();
+        var deploymentName = flinkBgDep.getMetadata().getName();
+
+        var namespaceStatuses = deploymentStatuses.get(namespace);
+        if (namespaceStatuses != null) {
+            namespaceStatuses
+                    .values()
+                    .forEach(deploymentNames -> 
deploymentNames.remove(deploymentName));
+        }
+
+        // Clear from JobStatus tracking
+        var namespaceJobStatuses = jobStatuses.get(namespace);
+        if (namespaceJobStatuses != null) {
+            namespaceJobStatuses
+                    .values()
+                    .forEach(deploymentNames -> 
deploymentNames.remove(deploymentName));
+        }
+    }
+
+    private Map<FlinkBlueGreenDeploymentState, Set<String>> 
initNamespaceMetrics(String namespace) {
+        MetricGroup nsGroup =
+                parentMetricGroup.createResourceNamespaceGroup(
+                        configuration, FlinkBlueGreenDeployment.class, 
namespace);
+
+        // Total deployment count
+        nsGroup.gauge(
+                COUNTER_NAME,
+                () ->
+                        deploymentStatuses.get(namespace).values().stream()
+                                .mapToInt(Set::size)
+                                .sum());
+
+        // Historical failure counter (increments on each transition TO 
FAILING)
+        failureCounters.put(namespace, nsGroup.counter(FAILURES_COUNTER_NAME));
+
+        // Per-BlueGreenState counts
+        Map<FlinkBlueGreenDeploymentState, Set<String>> statuses = new 
ConcurrentHashMap<>();
+        for (FlinkBlueGreenDeploymentState state : 
FlinkBlueGreenDeploymentState.values()) {
+            statuses.put(state, ConcurrentHashMap.newKeySet());
+            nsGroup.addGroup(BG_STATE_GROUP_NAME)
+                    .addGroup(state.toString())
+                    .gauge(COUNTER_NAME, () -> 
deploymentStatuses.get(namespace).get(state).size());
+        }
+
+        // Per-JobStatus counts (gauges for current state)
+        initJobStatusMetrics(namespace, nsGroup);
+
+        return statuses;
+    }
+
+    private void initJobStatusMetrics(String namespace, MetricGroup nsGroup) {
+        for (JobStatus status : JobStatus.values()) {
+            nsGroup.addGroup(JOB_STATUS_GROUP_NAME)
+                    .addGroup(status.toString())
+                    .gauge(
+                            COUNTER_NAME,
+                            () -> {
+                                var nsJobStatuses = jobStatuses.get(namespace);
+                                if (nsJobStatuses == null) {
+                                    return 0;
+                                }
+                                var statusSet = nsJobStatuses.get(status);
+                                return statusSet != null ? statusSet.size() : 
0;
+                            });
+        }
+    }
+
+    private Map<JobStatus, Set<String>> createJobStatusMap() {
+        Map<JobStatus, Set<String>> statuses = new ConcurrentHashMap<>();
+        for (JobStatus status : JobStatus.values()) {
+            statuses.put(status, ConcurrentHashMap.newKeySet());
+        }
+        return statuses;
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
index 02ee9a86..ca571188 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
@@ -20,9 +20,11 @@ package org.apache.flink.kubernetes.operator.metrics;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import 
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics;
 import org.apache.flink.kubernetes.operator.metrics.lifecycle.LifecycleMetrics;
 
 import io.fabric8.kubernetes.client.CustomResource;
@@ -69,6 +71,15 @@ public class MetricManager<CR extends CustomResource<?, ?>> {
         return metricManager;
     }
 
+    public static MetricManager<FlinkBlueGreenDeployment>
+            createFlinkBlueGreenDeploymentMetricManager(
+                    Configuration conf, KubernetesOperatorMetricGroup 
metricGroup) {
+        MetricManager<FlinkBlueGreenDeployment> metricManager = new 
MetricManager<>();
+        registerFlinkBlueGreenDeploymentMetrics(conf, metricGroup, 
metricManager);
+        registerBlueGreenLifecycleMetrics(conf, metricGroup, metricManager);
+        return metricManager;
+    }
+
     private static void registerFlinkDeploymentMetrics(
             Configuration conf,
             KubernetesOperatorMetricGroup metricGroup,
@@ -96,6 +107,15 @@ public class MetricManager<CR extends CustomResource<?, ?>> 
{
         }
     }
 
+    private static void registerFlinkBlueGreenDeploymentMetrics(
+            Configuration conf,
+            KubernetesOperatorMetricGroup metricGroup,
+            MetricManager<FlinkBlueGreenDeployment> metricManager) {
+        if 
(conf.get(KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED)) {
+            metricManager.register(new 
FlinkBlueGreenDeploymentMetrics(metricGroup, conf));
+        }
+    }
+
     private static <CR extends AbstractFlinkResource<?, ?>> void 
registerLifecycleMetrics(
             Configuration conf,
             KubernetesOperatorMetricGroup metricGroup,
@@ -106,6 +126,16 @@ public class MetricManager<CR extends CustomResource<?, 
?>> {
         }
     }
 
+    private static void registerBlueGreenLifecycleMetrics(
+            Configuration conf,
+            KubernetesOperatorMetricGroup metricGroup,
+            MetricManager<FlinkBlueGreenDeployment> metricManager) {
+        if 
(conf.get(KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED)
+                && 
conf.get(KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED)) {
+            metricManager.register(new BlueGreenLifecycleMetrics(conf, 
metricGroup));
+        }
+    }
+
     @VisibleForTesting
     public List<CustomResourceMetrics<CR>> getRegisteredMetrics() {
         return registeredMetrics;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetrics.java
new file mode 100644
index 00000000..e8743528
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetrics.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
+import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
+import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+import org.apache.flink.metrics.Histogram;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
+
+/** Manages lifecycle metrics for FlinkBlueGreenDeployment resources. */
+public class BlueGreenLifecycleMetrics implements 
CustomResourceMetrics<FlinkBlueGreenDeployment> {
+
+    public static final String LIFECYCLE_GROUP_NAME = "Lifecycle";
+    public static final String TRANSITION_GROUP_NAME = "Transition";
+    public static final String STATE_GROUP_NAME = "State";
+    public static final String TIME_SECONDS_NAME = "TimeSeconds";
+
+    public static final String TRANSITION_INITIAL_DEPLOYMENT = 
"InitialDeployment";
+    public static final String TRANSITION_BLUE_TO_GREEN = "BlueToGreen";
+    public static final String TRANSITION_GREEN_TO_BLUE = "GreenToBlue";
+
+    private static final List<String> TRANSITIONS =
+            List.of(
+                    TRANSITION_INITIAL_DEPLOYMENT,
+                    TRANSITION_BLUE_TO_GREEN,
+                    TRANSITION_GREEN_TO_BLUE);
+
+    private final KubernetesOperatorMetricGroup parentMetricGroup;
+    private final Configuration configuration;
+    private final FlinkOperatorConfiguration operatorConfig;
+    private final Clock clock;
+    private final boolean lifecycleMetricsEnabled;
+
+    private final Map<String, Map<String, 
BlueGreenResourceLifecycleMetricTracker>>
+            lifecycleTrackers = new ConcurrentHashMap<>();
+    private final Map<String, Map<String, Histogram>> 
namespaceTransitionHistograms =
+            new ConcurrentHashMap<>();
+    private final Map<FlinkBlueGreenDeploymentState, Map<String, Histogram>>
+            namespaceStateTimeHistograms = new ConcurrentHashMap<>();
+
+    private final Map<String, Histogram> systemTransitionHistograms = new 
ConcurrentHashMap<>();
+    private final Map<FlinkBlueGreenDeploymentState, Histogram> 
systemStateTimeHistograms =
+            new ConcurrentHashMap<>();
+
+    public BlueGreenLifecycleMetrics(
+            Configuration configuration, KubernetesOperatorMetricGroup 
parentMetricGroup) {
+        this.parentMetricGroup = parentMetricGroup;
+        this.configuration = configuration;
+        this.operatorConfig = 
FlinkOperatorConfiguration.fromConfiguration(configuration);
+        this.clock = Clock.systemDefaultZone();
+        this.lifecycleMetricsEnabled =
+                configuration.get(
+                        
KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED);
+
+        TRANSITIONS.forEach(t -> namespaceTransitionHistograms.put(t, new 
ConcurrentHashMap<>()));
+        for (FlinkBlueGreenDeploymentState state : 
FlinkBlueGreenDeploymentState.values()) {
+            namespaceStateTimeHistograms.put(state, new ConcurrentHashMap<>());
+        }
+    }
+
+    @Override
+    public void onUpdate(FlinkBlueGreenDeployment flinkBgDep) {
+        if (!lifecycleMetricsEnabled) {
+            return;
+        }
+
+        var namespace = flinkBgDep.getMetadata().getNamespace();
+        var deploymentName = flinkBgDep.getMetadata().getName();
+        var state = flinkBgDep.getStatus().getBlueGreenState();
+
+        getOrCreateTracker(namespace, deploymentName, 
flinkBgDep).onUpdate(state, clock.instant());
+    }
+
+    @Override
+    public void onRemove(FlinkBlueGreenDeployment flinkBgDep) {
+        var namespace = flinkBgDep.getMetadata().getNamespace();
+        var deploymentName = flinkBgDep.getMetadata().getName();
+
+        var namespaceTrackers = lifecycleTrackers.get(namespace);
+        if (namespaceTrackers != null) {
+            namespaceTrackers.remove(deploymentName);
+        }
+    }
+
+    private BlueGreenResourceLifecycleMetricTracker getOrCreateTracker(
+            String namespace, String deploymentName, FlinkBlueGreenDeployment 
flinkBgDep) {
+        return lifecycleTrackers
+                .computeIfAbsent(namespace, ns -> new ConcurrentHashMap<>())
+                .computeIfAbsent(deploymentName, dn -> 
createTracker(namespace, flinkBgDep));
+    }
+
+    private BlueGreenResourceLifecycleMetricTracker createTracker(
+            String namespace, FlinkBlueGreenDeployment flinkBgDep) {
+        var initialState = flinkBgDep.getStatus().getBlueGreenState();
+        var time =
+                initialState == INITIALIZING_BLUE
+                        ? 
Instant.parse(flinkBgDep.getMetadata().getCreationTimestamp())
+                        : clock.instant();
+
+        return new BlueGreenResourceLifecycleMetricTracker(
+                initialState,
+                time,
+                buildTransitionHistograms(namespace),
+                buildStateTimeHistograms(namespace));
+    }
+
+    private Map<String, List<Histogram>> buildTransitionHistograms(String 
namespace) {
+        var histos = new HashMap<String, List<Histogram>>();
+        for (String transition : TRANSITIONS) {
+            histos.put(
+                    transition,
+                    List.of(
+                            systemTransitionHistograms.computeIfAbsent(
+                                    transition,
+                                    t -> 
createSystemHistogram(TRANSITION_GROUP_NAME, t)),
+                            namespaceTransitionHistograms
+                                    .get(transition)
+                                    .computeIfAbsent(
+                                            namespace,
+                                            ns ->
+                                                    createNamespaceHistogram(
+                                                            ns,
+                                                            
TRANSITION_GROUP_NAME,
+                                                            transition))));
+        }
+        return histos;
+    }
+
+    private Map<FlinkBlueGreenDeploymentState, List<Histogram>> 
buildStateTimeHistograms(
+            String namespace) {
+        var histos = new HashMap<FlinkBlueGreenDeploymentState, 
List<Histogram>>();
+        for (FlinkBlueGreenDeploymentState state : 
FlinkBlueGreenDeploymentState.values()) {
+            histos.put(
+                    state,
+                    List.of(
+                            systemStateTimeHistograms.computeIfAbsent(
+                                    state, s -> 
createSystemHistogram(STATE_GROUP_NAME, s.name())),
+                            namespaceStateTimeHistograms
+                                    .get(state)
+                                    .computeIfAbsent(
+                                            namespace,
+                                            ns ->
+                                                    createNamespaceHistogram(
+                                                            ns, 
STATE_GROUP_NAME, state.name()))));
+        }
+        return histos;
+    }
+
+    private Histogram createSystemHistogram(String groupName, String 
metricName) {
+        return parentMetricGroup
+                .addGroup(FlinkBlueGreenDeployment.class.getSimpleName())
+                .addGroup(LIFECYCLE_GROUP_NAME)
+                .addGroup(groupName)
+                .addGroup(metricName)
+                .histogram(TIME_SECONDS_NAME, 
OperatorMetricUtils.createHistogram(operatorConfig));
+    }
+
+    private Histogram createNamespaceHistogram(
+            String namespace, String groupName, String metricName) {
+        return parentMetricGroup
+                .createResourceNamespaceGroup(
+                        configuration, FlinkBlueGreenDeployment.class, 
namespace)
+                .addGroup(LIFECYCLE_GROUP_NAME)
+                .addGroup(groupName)
+                .addGroup(metricName)
+                .histogram(TIME_SECONDS_NAME, 
OperatorMetricUtils.createHistogram(operatorConfig));
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenResourceLifecycleMetricTracker.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenResourceLifecycleMetricTracker.java
new file mode 100644
index 00000000..53631278
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenResourceLifecycleMetricTracker.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import org.apache.flink.metrics.Histogram;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_BLUE;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_GREEN;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_BLUE_TO_GREEN;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_GREEN_TO_BLUE;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_INITIAL_DEPLOYMENT;
+
+/**
+ * Tracks state transitions and timing for a single FlinkBlueGreenDeployment 
resource. Records
+ * transition durations and time spent in each state.
+ */
+public class BlueGreenResourceLifecycleMetricTracker {
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(BlueGreenResourceLifecycleMetricTracker.class);
+
+    private FlinkBlueGreenDeploymentState currentState;
+
+    // map(state -> (firstEntryTime, lastUpdateTime))
+    // Tracks when we entered each state and when we last saw it
+    private final Map<FlinkBlueGreenDeploymentState, Tuple2<Instant, Instant>> 
stateTimeMap =
+            new HashMap<>();
+
+    private final Map<String, List<Histogram>> transitionHistos;
+    private final Map<FlinkBlueGreenDeploymentState, List<Histogram>> 
stateTimeHistos;
+
+    public BlueGreenResourceLifecycleMetricTracker(
+            FlinkBlueGreenDeploymentState initialState,
+            Instant time,
+            Map<String, List<Histogram>> transitionHistos,
+            Map<FlinkBlueGreenDeploymentState, List<Histogram>> 
stateTimeHistos) {
+        this.currentState = initialState;
+        this.transitionHistos = transitionHistos;
+        this.stateTimeHistos = stateTimeHistos;
+        stateTimeMap.put(initialState, Tuple2.of(time, time));
+    }
+
+    /**
+     * Called on every reconciliation. Updates timestamps and records metrics 
on state changes.
+     *
+     * @param newState the current state from the resource status
+     * @param time the current timestamp
+     */
+    public void onUpdate(FlinkBlueGreenDeploymentState newState, Instant time) 
{
+        if (newState == currentState) {
+            updateLastUpdateTime(newState, time);
+            return;
+        }
+
+        // Record exit time for states that transition faster than the 
heartbeat interval.
+        updateLastUpdateTime(currentState, time);
+        recordTransitionMetrics(currentState, newState, time);
+
+        if (newState == ACTIVE_BLUE || newState == ACTIVE_GREEN) {
+            LOG.debug(
+                    "Transitioned from {} to {}, recording state times for {} 
and clearing",
+                    currentState,
+                    newState,
+                    stateTimeMap.keySet());
+
+            recordStateTimeMetrics();
+            clearTrackedStates();
+        }
+
+        stateTimeMap.put(newState, Tuple2.of(time, time));
+        currentState = newState;
+    }
+
+    private void updateLastUpdateTime(FlinkBlueGreenDeploymentState state, 
Instant time) {
+        var times = stateTimeMap.get(state);
+        if (times != null) {
+            times.f1 = time;
+        }
+    }
+
+    private void recordTransitionMetrics(
+            FlinkBlueGreenDeploymentState fromState,
+            FlinkBlueGreenDeploymentState toState,
+            Instant time) {
+
+        if (toState == ACTIVE_BLUE && 
stateTimeMap.containsKey(INITIALIZING_BLUE)) {
+            recordTransition(TRANSITION_INITIAL_DEPLOYMENT, INITIALIZING_BLUE, 
time);
+        }
+
+        if (toState == ACTIVE_GREEN && stateTimeMap.containsKey(ACTIVE_BLUE)) {
+            recordTransition(TRANSITION_BLUE_TO_GREEN, ACTIVE_BLUE, time);
+        }
+
+        if (toState == ACTIVE_BLUE
+                && fromState != INITIALIZING_BLUE
+                && stateTimeMap.containsKey(ACTIVE_GREEN)) {
+            recordTransition(TRANSITION_GREEN_TO_BLUE, ACTIVE_GREEN, time);
+        }
+    }
+
+    private void recordTransition(
+            String transitionName, FlinkBlueGreenDeploymentState fromState, 
Instant time) {
+        var fromTimes = stateTimeMap.get(fromState);
+        if (fromTimes == null) {
+            return;
+        }
+
+        long durationSeconds = Duration.between(fromTimes.f1, 
time).toSeconds();
+
+        LOG.debug(
+                "Recording transition time {}s for {} (from {})",
+                durationSeconds,
+                transitionName,
+                fromState);
+
+        var histograms = transitionHistos.get(transitionName);
+        if (histograms != null) {
+            histograms.forEach(h -> h.update(durationSeconds));
+        }
+    }
+
+    private void recordStateTimeMetrics() {
+        stateTimeMap.forEach(
+                (state, times) -> {
+                    long durationSeconds = Duration.between(times.f0, 
times.f1).toSeconds();
+                    var histograms = stateTimeHistos.get(state);
+                    if (histograms != null) {
+                        histograms.forEach(h -> h.update(durationSeconds));
+                    }
+                });
+    }
+
+    private void clearTrackedStates() {
+        stateTimeMap.clear();
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index e06aa111..c1328042 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -20,12 +20,14 @@ package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
 import 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
@@ -131,6 +133,8 @@ public class StatusRecorder<CR extends CustomResource<?, 
STATUS>, STATUS> {
             statusClass = FlinkSessionJobStatus.class;
         } else if (resource instanceof FlinkStateSnapshot) {
             statusClass = FlinkStateSnapshotStatus.class;
+        } else if (resource instanceof FlinkBlueGreenDeployment) {
+            statusClass = FlinkBlueGreenDeploymentStatus.class;
         } else {
             throw new RuntimeException(
                     String.format("Resource is unknown class: %s", 
resource.getClass()));
@@ -300,6 +304,25 @@ public class StatusRecorder<CR extends CustomResource<?, 
STATUS>, STATUS> {
         return new StatusRecorder<>(metricManager, consumer);
     }
 
+    public static StatusRecorder<FlinkBlueGreenDeployment, 
FlinkBlueGreenDeploymentStatus>
+            createForFlinkBlueGreenDeployment(
+                    KubernetesClient kubernetesClient,
+                    MetricManager<FlinkBlueGreenDeployment> metricManager,
+                    Collection<FlinkResourceListener> listeners) {
+        BiConsumer<FlinkBlueGreenDeployment, FlinkBlueGreenDeploymentStatus> 
consumer =
+                (resource, previousStatus) -> {
+                    listeners.forEach(
+                            listener -> {
+                                // FlinkResourceListener doesn't have a 
specific method for
+                                // BlueGreen deployments yet, so we skip 
listener notifications
+                                // for now. Metrics will still be tracked via 
MetricManager.
+                            });
+                    // No audit logging for BlueGreen deployments yet
+                };
+
+        return new StatusRecorder<>(metricManager, consumer);
+    }
+
     public static StatusRecorder<FlinkStateSnapshot, FlinkStateSnapshotStatus>
             createForFlinkStateSnapshot(
                     KubernetesClient kubernetesClient,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
index 8240b8c1..11c1b931 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
@@ -1015,6 +1015,9 @@ public class FlinkBlueGreenDeploymentControllerTest {
                     throws Exception {
         Long minReconciliationTs = System.currentTimeMillis() - 1;
 
+        // Create the resource in the mock server before reconciling
+        kubernetesClient.resource(blueGreenDeployment).createOrReplace();
+
         // 1a. Initializing deploymentStatus with this call
         var rs = reconcile(blueGreenDeployment);
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
index 6e8e058c..5bddcf4d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
@@ -24,7 +24,9 @@ import 
org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
 import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
@@ -50,7 +52,11 @@ public class TestingFlinkBlueGreenDeploymentController
                         flinkService,
                         null);
 
-        flinkBlueGreenDeploymentController = new 
FlinkBlueGreenDeploymentController(contextFactory);
+        StatusRecorder<FlinkBlueGreenDeployment, 
FlinkBlueGreenDeploymentStatus> statusRecorder =
+                new StatusRecorder<>(new MetricManager<>(), (resource, status) 
-> {});
+
+        flinkBlueGreenDeploymentController =
+                new FlinkBlueGreenDeploymentController(contextFactory, 
statusRecorder);
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
new file mode 100644
index 00000000..10469677
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode;
+import 
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import 
org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
+
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.UUID;
+
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_BLUE;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_GREEN;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN;
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkBlueGreenDeploymentMetrics.BG_STATE_GROUP_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkBlueGreenDeploymentMetrics.COUNTER_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkBlueGreenDeploymentMetrics.FAILURES_COUNTER_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkBlueGreenDeploymentMetrics.JOB_STATUS_GROUP_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkBlueGreenDeploymentMetrics}. */
+public class FlinkBlueGreenDeploymentMetricsTest {
+
+    private static final String TEST_NAMESPACE = "test-namespace";
+
+    private final Configuration configuration = new Configuration();
+    private TestingMetricListener listener;
+    private MetricManager<FlinkBlueGreenDeployment> metricManager;
+
+    @BeforeEach
+    public void init() {
+        listener = new TestingMetricListener(configuration);
+        metricManager =
+                MetricManager.createFlinkBlueGreenDeploymentMetricManager(
+                        configuration, listener.getMetricGroup());
+    }
+
+    @Test
+    public void testStateCountMetricsSameNamespace() {
+        var deployment1 = buildBlueGreenDeployment("deployment1", 
TEST_NAMESPACE);
+        var deployment2 = buildBlueGreenDeployment("deployment2", 
TEST_NAMESPACE);
+
+        var counterId =
+                listener.getNamespaceMetricId(
+                        FlinkBlueGreenDeployment.class, TEST_NAMESPACE, 
COUNTER_NAME);
+        assertTrue(listener.getGauge(counterId).isEmpty());
+
+        // Both deployments start in INITIALIZING_BLUE
+        metricManager.onUpdate(deployment1);
+        metricManager.onUpdate(deployment2);
+
+        assertEquals(2, listener.getGauge(counterId).get().getValue());
+        assertStateCount(TEST_NAMESPACE, INITIALIZING_BLUE, 2);
+
+        // Move deployment1 to ACTIVE_BLUE
+        deployment1.getStatus().setBlueGreenState(ACTIVE_BLUE);
+        metricManager.onUpdate(deployment1);
+
+        assertStateCount(TEST_NAMESPACE, INITIALIZING_BLUE, 1);
+        assertStateCount(TEST_NAMESPACE, ACTIVE_BLUE, 1);
+
+        // Move deployment2 to ACTIVE_BLUE as well
+        deployment2.getStatus().setBlueGreenState(ACTIVE_BLUE);
+        metricManager.onUpdate(deployment2);
+
+        assertStateCount(TEST_NAMESPACE, INITIALIZING_BLUE, 0);
+        assertStateCount(TEST_NAMESPACE, ACTIVE_BLUE, 2);
+
+        // Remove deployments
+        metricManager.onRemove(deployment1);
+        assertEquals(1, listener.getGauge(counterId).get().getValue());
+        assertStateCount(TEST_NAMESPACE, ACTIVE_BLUE, 1);
+
+        metricManager.onRemove(deployment2);
+        assertEquals(0, listener.getGauge(counterId).get().getValue());
+        assertStateCount(TEST_NAMESPACE, ACTIVE_BLUE, 0);
+    }
+
+    @Test
+    public void testStateCountMetricsMultiNamespace() {
+        var namespace1 = "ns1";
+        var namespace2 = "ns2";
+        var deployment1 = buildBlueGreenDeployment("deployment", namespace1);
+        var deployment2 = buildBlueGreenDeployment("deployment", namespace2);
+
+        var counterId1 =
+                listener.getNamespaceMetricId(
+                        FlinkBlueGreenDeployment.class, namespace1, 
COUNTER_NAME);
+        var counterId2 =
+                listener.getNamespaceMetricId(
+                        FlinkBlueGreenDeployment.class, namespace2, 
COUNTER_NAME);
+
+        assertTrue(listener.getGauge(counterId1).isEmpty());
+        assertTrue(listener.getGauge(counterId2).isEmpty());
+
+        metricManager.onUpdate(deployment1);
+        metricManager.onUpdate(deployment2);
+
+        assertEquals(1, listener.getGauge(counterId1).get().getValue());
+        assertEquals(1, listener.getGauge(counterId2).get().getValue());
+        assertStateCount(namespace1, INITIALIZING_BLUE, 1);
+        assertStateCount(namespace2, INITIALIZING_BLUE, 1);
+
+        // Move deployment1 to different state
+        deployment1.getStatus().setBlueGreenState(ACTIVE_BLUE);
+        metricManager.onUpdate(deployment1);
+
+        assertStateCount(namespace1, INITIALIZING_BLUE, 0);
+        assertStateCount(namespace1, ACTIVE_BLUE, 1);
+        // namespace2 should be unaffected
+        assertStateCount(namespace2, INITIALIZING_BLUE, 1);
+
+        metricManager.onRemove(deployment1);
+        metricManager.onRemove(deployment2);
+
+        assertEquals(0, listener.getGauge(counterId1).get().getValue());
+        assertEquals(0, listener.getGauge(counterId2).get().getValue());
+    }
+
+    @Test
+    public void testAllBlueGreenStatesHaveMetrics() {
+        var deployment = buildBlueGreenDeployment("test-deployment", 
TEST_NAMESPACE);
+        metricManager.onUpdate(deployment);
+
+        // Verify each state has a gauge registered
+        for (FlinkBlueGreenDeploymentState state : 
FlinkBlueGreenDeploymentState.values()) {
+            var stateId =
+                    listener.getNamespaceMetricId(
+                            FlinkBlueGreenDeployment.class,
+                            TEST_NAMESPACE,
+                            BG_STATE_GROUP_NAME,
+                            state.name(),
+                            COUNTER_NAME);
+            assertTrue(
+                    listener.getGauge(stateId).isPresent(),
+                    "Metric should exist for state: " + state);
+        }
+    }
+
+    @Test
+    public void testFullLifecycleStateCountUpdates() {
+        var deployment = buildBlueGreenDeployment("test", TEST_NAMESPACE);
+
+        // Start in INITIALIZING_BLUE
+        metricManager.onUpdate(deployment);
+        assertStateCount(TEST_NAMESPACE, INITIALIZING_BLUE, 1);
+        assertStateCount(TEST_NAMESPACE, ACTIVE_BLUE, 0);
+
+        // Transition to ACTIVE_BLUE
+        deployment.getStatus().setBlueGreenState(ACTIVE_BLUE);
+        metricManager.onUpdate(deployment);
+        assertStateCount(TEST_NAMESPACE, INITIALIZING_BLUE, 0);
+        assertStateCount(TEST_NAMESPACE, ACTIVE_BLUE, 1);
+
+        // Transition to TRANSITIONING_TO_GREEN
+        deployment.getStatus().setBlueGreenState(TRANSITIONING_TO_GREEN);
+        metricManager.onUpdate(deployment);
+        assertStateCount(TEST_NAMESPACE, ACTIVE_BLUE, 0);
+        assertStateCount(TEST_NAMESPACE, TRANSITIONING_TO_GREEN, 1);
+
+        // Transition to ACTIVE_GREEN
+        deployment.getStatus().setBlueGreenState(ACTIVE_GREEN);
+        metricManager.onUpdate(deployment);
+        assertStateCount(TEST_NAMESPACE, TRANSITIONING_TO_GREEN, 0);
+        assertStateCount(TEST_NAMESPACE, ACTIVE_GREEN, 1);
+    }
+
+    @Test
+    public void testMetricsDisabled() {
+        var conf = new Configuration();
+        conf.set(OPERATOR_RESOURCE_METRICS_ENABLED, false);
+        var disabledListener = new TestingMetricListener(conf);
+        var disabledMetricManager =
+                MetricManager.createFlinkBlueGreenDeploymentMetricManager(
+                        conf, disabledListener.getMetricGroup());
+
+        var deployment = buildBlueGreenDeployment("test", TEST_NAMESPACE);
+
+        var counterId =
+                disabledListener.getNamespaceMetricId(
+                        FlinkBlueGreenDeployment.class, TEST_NAMESPACE, 
COUNTER_NAME);
+
+        disabledMetricManager.onUpdate(deployment);
+        assertTrue(disabledListener.getGauge(counterId).isEmpty());
+
+        for (FlinkBlueGreenDeploymentState state : 
FlinkBlueGreenDeploymentState.values()) {
+            var statusId =
+                    disabledListener.getNamespaceMetricId(
+                            FlinkBlueGreenDeployment.class,
+                            TEST_NAMESPACE,
+                            BG_STATE_GROUP_NAME,
+                            state.name(),
+                            COUNTER_NAME);
+            assertTrue(disabledListener.getGauge(statusId).isEmpty());
+        }
+    }
+
+    @Test
+    public void testRepeatedUpdatesDoNotDuplicateCount() {
+        var deployment = buildBlueGreenDeployment("test", TEST_NAMESPACE);
+
+        // Multiple updates in same state should not increase count
+        metricManager.onUpdate(deployment);
+        metricManager.onUpdate(deployment);
+        metricManager.onUpdate(deployment);
+
+        var counterId =
+                listener.getNamespaceMetricId(
+                        FlinkBlueGreenDeployment.class, TEST_NAMESPACE, 
COUNTER_NAME);
+        assertEquals(1, listener.getGauge(counterId).get().getValue());
+        assertStateCount(TEST_NAMESPACE, INITIALIZING_BLUE, 1);
+    }
+
+    @Test
+    public void testJobStatusGaugeTracking() {
+        var deployment1 = buildBlueGreenDeployment("deployment1", 
TEST_NAMESPACE);
+        var deployment2 = buildBlueGreenDeployment("deployment2", 
TEST_NAMESPACE);
+
+        var runningId =
+                listener.getNamespaceMetricId(
+                        FlinkBlueGreenDeployment.class,
+                        TEST_NAMESPACE,
+                        JOB_STATUS_GROUP_NAME,
+                        JobStatus.RUNNING.name(),
+                        COUNTER_NAME);
+        var failingId =
+                listener.getNamespaceMetricId(
+                        FlinkBlueGreenDeployment.class,
+                        TEST_NAMESPACE,
+                        JOB_STATUS_GROUP_NAME,
+                        JobStatus.FAILING.name(),
+                        COUNTER_NAME);
+
+        assertTrue(listener.getGauge(runningId).isEmpty());
+        assertTrue(listener.getGauge(failingId).isEmpty());
+
+        // Both start with RUNNING
+        deployment1.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+        deployment2.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+        metricManager.onUpdate(deployment1);
+        metricManager.onUpdate(deployment2);
+        assertJobStatusCount(TEST_NAMESPACE, JobStatus.RUNNING, 2);
+        assertJobStatusCount(TEST_NAMESPACE, JobStatus.FAILING, 0);
+
+        // deployment1 transitions to FAILING
+        deployment1.getStatus().getJobStatus().setState(JobStatus.FAILING);
+        metricManager.onUpdate(deployment1);
+        assertJobStatusCount(TEST_NAMESPACE, JobStatus.RUNNING, 1);
+        assertJobStatusCount(TEST_NAMESPACE, JobStatus.FAILING, 1);
+
+        // deployment2 also transitions to FAILING
+        deployment2.getStatus().getJobStatus().setState(JobStatus.FAILING);
+        metricManager.onUpdate(deployment2);
+        assertJobStatusCount(TEST_NAMESPACE, JobStatus.RUNNING, 0);
+        assertJobStatusCount(TEST_NAMESPACE, JobStatus.FAILING, 2);
+
+        // Remove deployment1
+        metricManager.onRemove(deployment1);
+        assertJobStatusCount(TEST_NAMESPACE, JobStatus.FAILING, 1);
+
+        // Remove deployment2
+        metricManager.onRemove(deployment2);
+        assertJobStatusCount(TEST_NAMESPACE, JobStatus.FAILING, 0);
+    }
+
+    @Test
+    public void testFailuresCounterIncrementsOnTransitionToFailing() {
+        var deployment = buildBlueGreenDeployment("test", TEST_NAMESPACE);
+
+        var failuresId =
+                listener.getNamespaceMetricId(
+                        FlinkBlueGreenDeployment.class, TEST_NAMESPACE, 
FAILURES_COUNTER_NAME);
+
+        assertTrue(listener.getCounter(failuresId).isEmpty());
+
+        // Start with RUNNING
+        deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+        metricManager.onUpdate(deployment);
+        assertEquals(0L, listener.getCounter(failuresId).get().getCount());
+
+        // First transition to FAILING - counter increments
+        deployment.getStatus().getJobStatus().setState(JobStatus.FAILING);
+        metricManager.onUpdate(deployment);
+        assertEquals(1L, listener.getCounter(failuresId).get().getCount());
+
+        // Stay in FAILING - counter does NOT increment
+        metricManager.onUpdate(deployment);
+        assertEquals(1L, listener.getCounter(failuresId).get().getCount());
+
+        // Recover to RUNNING - counter stays same (never decrements)
+        deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+        metricManager.onUpdate(deployment);
+        assertEquals(1L, listener.getCounter(failuresId).get().getCount());
+
+        // Second transition to FAILING - counter increments again
+        deployment.getStatus().getJobStatus().setState(JobStatus.FAILING);
+        metricManager.onUpdate(deployment);
+        assertEquals(2L, listener.getCounter(failuresId).get().getCount());
+    }
+
+    @Test
+    public void testFailuresCounterMultipleDeployments() {
+        var deployment1 = buildBlueGreenDeployment("deployment1", 
TEST_NAMESPACE);
+        var deployment2 = buildBlueGreenDeployment("deployment2", 
TEST_NAMESPACE);
+
+        var failuresId =
+                listener.getNamespaceMetricId(
+                        FlinkBlueGreenDeployment.class, TEST_NAMESPACE, 
FAILURES_COUNTER_NAME);
+
+        assertTrue(listener.getCounter(failuresId).isEmpty());
+
+        // Both start RUNNING
+        deployment1.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+        deployment2.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+        metricManager.onUpdate(deployment1);
+        metricManager.onUpdate(deployment2);
+        assertEquals(0L, listener.getCounter(failuresId).get().getCount());
+
+        // deployment1 fails
+        deployment1.getStatus().getJobStatus().setState(JobStatus.FAILING);
+        metricManager.onUpdate(deployment1);
+        assertEquals(1L, listener.getCounter(failuresId).get().getCount());
+
+        // deployment2 fails
+        deployment2.getStatus().getJobStatus().setState(JobStatus.FAILING);
+        metricManager.onUpdate(deployment2);
+        assertEquals(2L, listener.getCounter(failuresId).get().getCount());
+
+        // deployment1 recovers - counter stays 2
+        deployment1.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+        metricManager.onUpdate(deployment1);
+        assertEquals(2L, listener.getCounter(failuresId).get().getCount());
+
+        // Remove deployments - counter stays 2 (historical, never decrements)
+        metricManager.onRemove(deployment1);
+        assertEquals(2L, listener.getCounter(failuresId).get().getCount());
+        metricManager.onRemove(deployment2);
+        assertEquals(2L, listener.getCounter(failuresId).get().getCount());
+    }
+
+    @Test
+    public void testFailuresCounterIsolatedByNamespace() {
+        var namespace1 = "ns1";
+        var namespace2 = "ns2";
+        var deployment1 = buildBlueGreenDeployment("deployment", namespace1);
+        var deployment2 = buildBlueGreenDeployment("deployment", namespace2);
+
+        var failuresId1 =
+                listener.getNamespaceMetricId(
+                        FlinkBlueGreenDeployment.class, namespace1, 
FAILURES_COUNTER_NAME);
+        var failuresId2 =
+                listener.getNamespaceMetricId(
+                        FlinkBlueGreenDeployment.class, namespace2, 
FAILURES_COUNTER_NAME);
+
+        assertTrue(listener.getCounter(failuresId1).isEmpty());
+        assertTrue(listener.getCounter(failuresId2).isEmpty());
+
+        // Initialize both namespaces first with RUNNING deployments
+        deployment1.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+        deployment2.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+        metricManager.onUpdate(deployment1);
+        metricManager.onUpdate(deployment2);
+        assertEquals(0L, listener.getCounter(failuresId1).get().getCount());
+        assertEquals(0L, listener.getCounter(failuresId2).get().getCount());
+
+        // deployment1 in ns1 fails
+        deployment1.getStatus().getJobStatus().setState(JobStatus.FAILING);
+        metricManager.onUpdate(deployment1);
+
+        // Only ns1 counter increments
+        assertEquals(1L, listener.getCounter(failuresId1).get().getCount());
+        assertEquals(0L, listener.getCounter(failuresId2).get().getCount());
+
+        // deployment2 in ns2 fails
+        deployment2.getStatus().getJobStatus().setState(JobStatus.FAILING);
+        metricManager.onUpdate(deployment2);
+
+        // Counters are isolated
+        assertEquals(1L, listener.getCounter(failuresId1).get().getCount());
+        assertEquals(1L, listener.getCounter(failuresId2).get().getCount());
+    }
+
+    @Test
+    public void testAllJobStatusesHaveMetrics() {
+        var deployment = buildBlueGreenDeployment("test-deployment", 
TEST_NAMESPACE);
+        deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+        metricManager.onUpdate(deployment);
+
+        // Verify each JobStatus has a gauge registered
+        for (JobStatus status : JobStatus.values()) {
+            var statusId =
+                    listener.getNamespaceMetricId(
+                            FlinkBlueGreenDeployment.class,
+                            TEST_NAMESPACE,
+                            JOB_STATUS_GROUP_NAME,
+                            status.name(),
+                            COUNTER_NAME);
+            assertTrue(
+                    listener.getGauge(statusId).isPresent(),
+                    "Metric should exist for JobStatus: " + status);
+        }
+    }
+
+    @Test
+    public void testJobStatusDoesNotDoubleCount() {
+        var deployment = buildBlueGreenDeployment("test", TEST_NAMESPACE);
+
+        // Start with RUNNING
+        deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING);
+        metricManager.onUpdate(deployment);
+        assertJobStatusCount(TEST_NAMESPACE, JobStatus.RUNNING, 1);
+
+        // Multiple updates in same JobStatus should not duplicate
+        metricManager.onUpdate(deployment);
+        metricManager.onUpdate(deployment);
+        assertJobStatusCount(TEST_NAMESPACE, JobStatus.RUNNING, 1);
+    }
+
+    private FlinkBlueGreenDeployment buildBlueGreenDeployment(String name, 
String namespace) {
+        var deployment = new FlinkBlueGreenDeployment();
+        deployment.setMetadata(
+                new ObjectMetaBuilder()
+                        .withName(name)
+                        .withNamespace(namespace)
+                        .withUid(UUID.randomUUID().toString())
+                        .withCreationTimestamp(Instant.now().toString())
+                        .build());
+
+        var flinkDeploymentSpec =
+                FlinkDeploymentSpec.builder()
+                        .flinkConfiguration(new ConfigObjectNode())
+                        
.job(JobSpec.builder().upgradeMode(UpgradeMode.STATELESS).build())
+                        .build();
+
+        var bgDeploymentSpec =
+                new FlinkBlueGreenDeploymentSpec(
+                        new HashMap<>(),
+                        
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build());
+
+        deployment.setSpec(bgDeploymentSpec);
+
+        var status = new FlinkBlueGreenDeploymentStatus();
+        status.setBlueGreenState(INITIALIZING_BLUE);
+        status.setJobStatus(new 
org.apache.flink.kubernetes.operator.api.status.JobStatus());
+        deployment.setStatus(status);
+
+        return deployment;
+    }
+
+    private void assertStateCount(
+            String namespace, FlinkBlueGreenDeploymentState state, int 
expectedCount) {
+        var stateId =
+                listener.getNamespaceMetricId(
+                        FlinkBlueGreenDeployment.class,
+                        namespace,
+                        BG_STATE_GROUP_NAME,
+                        state.name(),
+                        COUNTER_NAME);
+        assertEquals(
+                expectedCount,
+                listener.getGauge(stateId).get().getValue(),
+                "State count mismatch for " + state);
+    }
+
+    private void assertJobStatusCount(String namespace, JobStatus status, int 
expectedCount) {
+        var statusId =
+                listener.getNamespaceMetricId(
+                        FlinkBlueGreenDeployment.class,
+                        namespace,
+                        JOB_STATUS_GROUP_NAME,
+                        status.name(),
+                        COUNTER_NAME);
+        assertEquals(
+                expectedCount,
+                listener.getGauge(statusId).get().getValue(),
+                "JobStatus count mismatch for " + status);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java
new file mode 100644
index 00000000..ce0af71c
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode;
+import 
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import 
org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
+import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
+import org.apache.flink.kubernetes.operator.metrics.TestingMetricListener;
+
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.UUID;
+
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
+import static 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.LIFECYCLE_GROUP_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.STATE_GROUP_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TIME_SECONDS_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_BLUE_TO_GREEN;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_GREEN_TO_BLUE;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_GROUP_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_INITIAL_DEPLOYMENT;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link BlueGreenLifecycleMetrics}. */
+public class BlueGreenLifecycleMetricsTest {
+
+    private static final String TEST_NAMESPACE = "test-namespace";
+    private static final String[] TRANSITIONS = {
+        TRANSITION_INITIAL_DEPLOYMENT, TRANSITION_BLUE_TO_GREEN, 
TRANSITION_GREEN_TO_BLUE
+    };
+
+    private final Configuration configuration = new Configuration();
+    private TestingMetricListener listener;
+    private MetricManager<FlinkBlueGreenDeployment> metricManager;
+
+    @BeforeEach
+    public void init() {
+        listener = new TestingMetricListener(configuration);
+        metricManager =
+                MetricManager.createFlinkBlueGreenDeploymentMetricManager(
+                        configuration, listener.getMetricGroup());
+    }
+
+    @Test
+    public void testNamespaceHistogramMetricsExist() {
+        var deployment = buildBlueGreenDeployment("test-deployment", 
TEST_NAMESPACE);
+        metricManager.onUpdate(deployment);
+
+        for (String transition : TRANSITIONS) {
+            assertTrue(
+                    listener.getHistogram(
+                                    getNamespaceHistogramId(
+                                            TEST_NAMESPACE, 
TRANSITION_GROUP_NAME, transition))
+                            .isPresent(),
+                    "Transition histogram should exist for: " + transition);
+        }
+
+        for (FlinkBlueGreenDeploymentState state : 
FlinkBlueGreenDeploymentState.values()) {
+            assertTrue(
+                    listener.getHistogram(
+                                    getNamespaceHistogramId(
+                                            TEST_NAMESPACE, STATE_GROUP_NAME, 
state.name()))
+                            .isPresent(),
+                    "State time histogram should exist for: " + state);
+        }
+    }
+
+    @Test
+    public void testSystemLevelHistogramsExist() {
+        var deployment = buildBlueGreenDeployment("test-deployment", 
TEST_NAMESPACE);
+        metricManager.onUpdate(deployment);
+
+        for (String transition : TRANSITIONS) {
+            assertTrue(
+                    listener.getHistogram(
+                                    getSystemLevelHistogramId(
+                                            listener, TRANSITION_GROUP_NAME, 
transition))
+                            .isPresent(),
+                    "System-level transition histogram should exist for: " + 
transition);
+        }
+
+        for (FlinkBlueGreenDeploymentState state : 
FlinkBlueGreenDeploymentState.values()) {
+            assertTrue(
+                    listener.getHistogram(
+                                    getSystemLevelHistogramId(
+                                            listener, STATE_GROUP_NAME, 
state.name()))
+                            .isPresent(),
+                    "System-level state time histogram should exist for: " + 
state);
+        }
+    }
+
+    @Test
+    public void testMultiNamespaceHistogramIsolation() {
+        var namespace1 = "ns1";
+        var namespace2 = "ns2";
+        var ns1HistoId =
+                getNamespaceHistogramId(
+                        namespace1, TRANSITION_GROUP_NAME, 
TRANSITION_INITIAL_DEPLOYMENT);
+
+        var dep1 = buildBlueGreenDeployment("dep1", namespace1);
+        metricManager.onUpdate(dep1);
+        var ns1Histo = listener.getHistogram(ns1HistoId).get();
+
+        var dep2 = buildBlueGreenDeployment("dep2", namespace1);
+        metricManager.onUpdate(dep2);
+        assertTrue(
+                ns1Histo == listener.getHistogram(ns1HistoId).get(),
+                "Deployments in same namespace should share histogram 
instance");
+
+        var dep3 = buildBlueGreenDeployment("dep3", namespace2);
+        metricManager.onUpdate(dep3);
+        var ns2HistoId =
+                getNamespaceHistogramId(
+                        namespace2, TRANSITION_GROUP_NAME, 
TRANSITION_INITIAL_DEPLOYMENT);
+        assertTrue(
+                ns1Histo != listener.getHistogram(ns2HistoId).get(),
+                "Different namespaces should have different histogram 
instances");
+    }
+
+    @Test
+    public void testLifecycleMetricsDisabled() {
+        var conf = new Configuration();
+        conf.set(OPERATOR_LIFECYCLE_METRICS_ENABLED, false);
+        var disabledMetricManager =
+                MetricManager.createFlinkBlueGreenDeploymentMetricManager(
+                        conf, new 
TestingMetricListener(conf).getMetricGroup());
+
+        assertNull(
+                getBlueGreenLifecycleMetrics(disabledMetricManager),
+                "BlueGreenLifecycleMetrics should not be registered when 
disabled");
+    }
+
+    private FlinkBlueGreenDeployment buildBlueGreenDeployment(String name, 
String namespace) {
+        var deployment = new FlinkBlueGreenDeployment();
+        deployment.setMetadata(
+                new ObjectMetaBuilder()
+                        .withName(name)
+                        .withNamespace(namespace)
+                        .withUid(UUID.randomUUID().toString())
+                        .withCreationTimestamp(Instant.now().toString())
+                        .build());
+
+        var flinkDeploymentSpec =
+                FlinkDeploymentSpec.builder()
+                        .flinkConfiguration(new ConfigObjectNode())
+                        
.job(JobSpec.builder().upgradeMode(UpgradeMode.STATELESS).build())
+                        .build();
+
+        var bgDeploymentSpec =
+                new FlinkBlueGreenDeploymentSpec(
+                        new HashMap<>(),
+                        
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build());
+
+        deployment.setSpec(bgDeploymentSpec);
+
+        var status = new FlinkBlueGreenDeploymentStatus();
+        status.setBlueGreenState(INITIALIZING_BLUE);
+        deployment.setStatus(status);
+
+        return deployment;
+    }
+
+    private String getNamespaceHistogramId(String namespace, String groupName, 
String metricName) {
+        return listener.getNamespaceMetricId(
+                FlinkBlueGreenDeployment.class,
+                namespace,
+                LIFECYCLE_GROUP_NAME,
+                groupName,
+                metricName,
+                TIME_SECONDS_NAME);
+    }
+
+    private String getSystemLevelHistogramId(
+            TestingMetricListener metricListener, String groupName, String 
metricName) {
+        return metricListener.getMetricId(
+                String.format(
+                        "%s.%s.%s.%s.%s",
+                        FlinkBlueGreenDeployment.class.getSimpleName(),
+                        LIFECYCLE_GROUP_NAME,
+                        groupName,
+                        metricName,
+                        TIME_SECONDS_NAME));
+    }
+
+    private static BlueGreenLifecycleMetrics getBlueGreenLifecycleMetrics(
+            MetricManager<FlinkBlueGreenDeployment> metricManager) {
+        for (CustomResourceMetrics<?> metrics : 
metricManager.getRegisteredMetrics()) {
+            if (metrics instanceof BlueGreenLifecycleMetrics) {
+                return (BlueGreenLifecycleMetrics) metrics;
+            }
+        }
+        return null;
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenResourceLifecycleMetricTrackerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenResourceLifecycleMetricTrackerTest.java
new file mode 100644
index 00000000..cbc80eff
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenResourceLifecycleMetricTrackerTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+import org.apache.flink.metrics.Histogram;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_BLUE;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_GREEN;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.SAVEPOINTING_BLUE;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.SAVEPOINTING_GREEN;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_BLUE_TO_GREEN;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_GREEN_TO_BLUE;
+import static 
org.apache.flink.kubernetes.operator.metrics.lifecycle.BlueGreenLifecycleMetrics.TRANSITION_INITIAL_DEPLOYMENT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for {@link BlueGreenResourceLifecycleMetricTracker}. */
+public class BlueGreenResourceLifecycleMetricTrackerTest {
+
+    private Map<String, List<Histogram>> transitionHistos;
+    private Map<FlinkBlueGreenDeploymentState, List<Histogram>> 
stateTimeHistos;
+
+    @BeforeEach
+    void setUp() {
+        transitionHistos = new ConcurrentHashMap<>();
+        transitionHistos.put(TRANSITION_INITIAL_DEPLOYMENT, 
List.of(createHistogram()));
+        transitionHistos.put(TRANSITION_BLUE_TO_GREEN, 
List.of(createHistogram()));
+        transitionHistos.put(TRANSITION_GREEN_TO_BLUE, 
List.of(createHistogram()));
+
+        stateTimeHistos = new ConcurrentHashMap<>();
+        for (FlinkBlueGreenDeploymentState state : 
FlinkBlueGreenDeploymentState.values()) {
+            stateTimeHistos.put(state, List.of(createHistogram()));
+        }
+    }
+
+    @Test
+    void testInitialDeploymentRecordsTransitionTime() {
+        long ts = 0;
+        var tracker = createTracker(INITIALIZING_BLUE, 
Instant.ofEpochSecond(ts));
+        tracker.onUpdate(TRANSITIONING_TO_BLUE, Instant.ofEpochSecond(ts += 
5));
+        tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 5));
+
+        assertTransitionRecorded(TRANSITION_INITIAL_DEPLOYMENT, 5);
+    }
+
+    @Test
+    void testInitialDeploymentRecordsStateTime() {
+        long ts = 0;
+        var tracker = createTracker(INITIALIZING_BLUE, 
Instant.ofEpochSecond(ts));
+        tracker.onUpdate(TRANSITIONING_TO_BLUE, Instant.ofEpochSecond(ts += 
2));
+        tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 5));
+
+        assertStateTimeRecorded(INITIALIZING_BLUE, 2);
+        assertStateTimeRecorded(TRANSITIONING_TO_BLUE, 5);
+    }
+
+    @Test
+    void testBlueToGreenRecordsTransitionTime() {
+        long ts = 0;
+        var tracker = createTracker(ACTIVE_BLUE, Instant.ofEpochSecond(ts));
+        tracker.onUpdate(SAVEPOINTING_BLUE, Instant.ofEpochSecond(ts += 5));
+        tracker.onUpdate(TRANSITIONING_TO_GREEN, Instant.ofEpochSecond(ts += 
10));
+        tracker.onUpdate(ACTIVE_GREEN, Instant.ofEpochSecond(ts += 5));
+
+        assertTransitionRecorded(TRANSITION_BLUE_TO_GREEN, 15);
+    }
+
+    @Test
+    void testBlueToGreenRecordsAllIntermediateStateTimes() {
+        long ts = 0;
+        var tracker = createTracker(ACTIVE_BLUE, Instant.ofEpochSecond(ts));
+        tracker.onUpdate(SAVEPOINTING_BLUE, Instant.ofEpochSecond(ts += 5));
+        tracker.onUpdate(TRANSITIONING_TO_GREEN, Instant.ofEpochSecond(ts += 
10));
+        tracker.onUpdate(ACTIVE_GREEN, Instant.ofEpochSecond(ts += 3));
+
+        assertStateTimeRecorded(ACTIVE_BLUE, 5);
+        assertStateTimeRecorded(SAVEPOINTING_BLUE, 10);
+        assertStateTimeRecorded(TRANSITIONING_TO_GREEN, 3);
+    }
+
+    @Test
+    void testGreenToBlueRecordsTransitionTime() {
+        long ts = 0;
+        var tracker = createTracker(ACTIVE_GREEN, Instant.ofEpochSecond(ts));
+        tracker.onUpdate(SAVEPOINTING_GREEN, Instant.ofEpochSecond(ts += 5));
+        tracker.onUpdate(TRANSITIONING_TO_BLUE, Instant.ofEpochSecond(ts += 
8));
+        tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 2));
+
+        assertTransitionRecorded(TRANSITION_GREEN_TO_BLUE, 10);
+    }
+
+    @Test
+    void testGreenToBlueRecordsAllIntermediateStateTimes() {
+        long ts = 0;
+        var tracker = createTracker(ACTIVE_GREEN, Instant.ofEpochSecond(ts));
+        tracker.onUpdate(SAVEPOINTING_GREEN, Instant.ofEpochSecond(ts += 5));
+        tracker.onUpdate(TRANSITIONING_TO_BLUE, Instant.ofEpochSecond(ts += 
8));
+        tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 2));
+
+        assertStateTimeRecorded(ACTIVE_GREEN, 5);
+        assertStateTimeRecorded(SAVEPOINTING_GREEN, 8);
+        assertStateTimeRecorded(TRANSITIONING_TO_BLUE, 2);
+    }
+
+    @Test
+    void testSameStateUpdatesOnlyUpdateTimestamp() {
+        long ts = 0;
+        var tracker = createTracker(ACTIVE_BLUE, Instant.ofEpochSecond(ts));
+        tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 1));
+        tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 1));
+        tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 1));
+
+        assertNoTransitionRecorded(TRANSITION_BLUE_TO_GREEN);
+        assertNoTransitionRecorded(TRANSITION_GREEN_TO_BLUE);
+        assertNoStateTimeRecorded(ACTIVE_BLUE);
+    }
+
+    @Test
+    void testIntermediateStateMetricsOnlyRecordedAtStableState() {
+        long ts = 0;
+        var tracker = createTracker(ACTIVE_BLUE, Instant.ofEpochSecond(ts));
+        tracker.onUpdate(SAVEPOINTING_BLUE, Instant.ofEpochSecond(ts += 5));
+        tracker.onUpdate(TRANSITIONING_TO_GREEN, Instant.ofEpochSecond(ts += 
5));
+
+        assertNoStateTimeRecorded(ACTIVE_BLUE);
+        assertNoStateTimeRecorded(SAVEPOINTING_BLUE);
+
+        tracker.onUpdate(ACTIVE_GREEN, Instant.ofEpochSecond(ts += 5));
+
+        assertStateTimeRecorded(ACTIVE_BLUE, 5);
+        assertStateTimeRecorded(SAVEPOINTING_BLUE, 5);
+        assertStateTimeRecorded(TRANSITIONING_TO_GREEN, 5);
+    }
+
+    @Test
+    void testRecoveryFromActiveStateTracksNextTransition() {
+        long ts = 0;
+        var tracker = createTracker(ACTIVE_BLUE, Instant.ofEpochSecond(ts));
+        tracker.onUpdate(TRANSITIONING_TO_GREEN, Instant.ofEpochSecond(ts += 
10));
+        tracker.onUpdate(ACTIVE_GREEN, Instant.ofEpochSecond(ts += 5));
+
+        assertTransitionRecorded(TRANSITION_BLUE_TO_GREEN, 5);
+    }
+
+    @Test
+    void testConsecutiveTransitionsEachTrackedIndependently() {
+        long ts = 0;
+        var tracker = createTracker(ACTIVE_BLUE, Instant.ofEpochSecond(ts));
+
+        tracker.onUpdate(TRANSITIONING_TO_GREEN, Instant.ofEpochSecond(ts += 
10));
+        tracker.onUpdate(ACTIVE_GREEN, Instant.ofEpochSecond(ts += 5));
+
+        assertTransitionRecorded(TRANSITION_BLUE_TO_GREEN, 5);
+
+        tracker.onUpdate(TRANSITIONING_TO_BLUE, Instant.ofEpochSecond(ts += 
8));
+        tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 2));
+
+        assertTransitionRecorded(TRANSITION_GREEN_TO_BLUE, 2);
+    }
+
+    @Test
+    void testFailedBlueToGreenRollbackRecordsStateTimes() {
+        long ts = 0;
+        var tracker = createTracker(ACTIVE_BLUE, Instant.ofEpochSecond(ts));
+
+        tracker.onUpdate(SAVEPOINTING_BLUE, Instant.ofEpochSecond(ts += 10));
+        tracker.onUpdate(TRANSITIONING_TO_GREEN, Instant.ofEpochSecond(ts += 
15));
+        tracker.onUpdate(ACTIVE_BLUE, Instant.ofEpochSecond(ts += 5));
+
+        assertStateTimeRecorded(ACTIVE_BLUE, 10);
+        assertStateTimeRecorded(SAVEPOINTING_BLUE, 15);
+        assertStateTimeRecorded(TRANSITIONING_TO_GREEN, 5);
+    }
+
+    @Test
+    void testFailedGreenToBlueRollbackRecordsStateTimes() {
+        long ts = 0;
+        var tracker = createTracker(ACTIVE_GREEN, Instant.ofEpochSecond(ts));
+
+        tracker.onUpdate(SAVEPOINTING_GREEN, Instant.ofEpochSecond(ts += 8));
+        tracker.onUpdate(TRANSITIONING_TO_BLUE, Instant.ofEpochSecond(ts += 
12));
+        tracker.onUpdate(ACTIVE_GREEN, Instant.ofEpochSecond(ts += 3));
+
+        assertStateTimeRecorded(ACTIVE_GREEN, 8);
+        assertStateTimeRecorded(SAVEPOINTING_GREEN, 12);
+        assertStateTimeRecorded(TRANSITIONING_TO_BLUE, 3);
+    }
+
+    private BlueGreenResourceLifecycleMetricTracker createTracker(
+            FlinkBlueGreenDeploymentState initialState, Instant initialTime) {
+        return new BlueGreenResourceLifecycleMetricTracker(
+                initialState, initialTime, transitionHistos, stateTimeHistos);
+    }
+
+    private Histogram createHistogram() {
+        return OperatorMetricUtils.createHistogram(
+                FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()));
+    }
+
+    private void assertTransitionRecorded(String transitionName, long 
expectedSeconds) {
+        var stats = 
transitionHistos.get(transitionName).get(0).getStatistics();
+        assertEquals(1, stats.size(), transitionName + " should have 1 
sample");
+        assertEquals(expectedSeconds, (long) stats.getMean(), transitionName + 
" duration");
+    }
+
+    private void assertNoTransitionRecorded(String transitionName) {
+        var stats = 
transitionHistos.get(transitionName).get(0).getStatistics();
+        assertEquals(0, stats.size(), transitionName + " should have no 
samples");
+    }
+
+    private void assertStateTimeRecorded(
+            FlinkBlueGreenDeploymentState state, long expectedSeconds) {
+        var stats = stateTimeHistos.get(state).get(0).getStatistics();
+        assertEquals(1, stats.size(), state + " should have 1 sample");
+        assertEquals(expectedSeconds, (long) stats.getMean(), state + " 
duration");
+    }
+
+    private void assertNoStateTimeRecorded(FlinkBlueGreenDeploymentState 
state) {
+        var stats = stateTimeHistos.get(state).get(0).getStatistics();
+        assertEquals(0, stats.size(), state + " should have no samples");
+    }
+}


Reply via email to