This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new a31aeac0 [FLINK-38577] Added BlueGreen ingress that switches between 
active Svc + resolve path conflict on Blue and Green deployment ingresses
a31aeac0 is described below

commit a31aeac050ad3835da3ffbfbf2d2e9e53ce8afae
Author: Daniel Rossos <[email protected]>
AuthorDate: Mon Feb 2 02:28:13 2026 -0500

    [FLINK-38577] Added BlueGreen ingress that switches between active Svc + 
resolve path conflict on Blue and Green deployment ingresses
---
 docs/content/docs/custom-resource/reference.md     |   1 +
 e2e-tests/data/bluegreen-ingress.yaml              |  58 ++++++++
 e2e-tests/test_bluegreen_ingress_rotation.sh       | 133 +++++++++++++++++++
 .../api/spec/FlinkBlueGreenDeploymentSpec.java     |   4 +
 .../flink/kubernetes/operator/FlinkOperator.java   |   4 +-
 .../FlinkBlueGreenDeploymentController.java        |   8 ++
 .../bluegreen/BlueGreenDeploymentService.java      |  73 ++++++++++
 .../bluegreen/handlers/ActiveStateHandler.java     |   4 +
 .../operator/utils/EventSourceUtils.java           |  24 ++++
 .../kubernetes/operator/utils/IngressUtils.java    | 112 ++++++++++++----
 .../operator/utils/bluegreen/BlueGreenUtils.java   |   8 ++
 .../FlinkBlueGreenDeploymentControllerTest.java    | 147 ++++++++++++++++++++-
 .../TestingFlinkBlueGreenDeploymentController.java |   3 +-
 .../FlinkBlueGreenDeploymentMetricsTest.java       |   1 +
 .../lifecycle/BlueGreenLifecycleMetricsTest.java   |   1 +
 .../utils/bluegreen/BlueGreenUtilsTest.java        |   1 +
 ...inkbluegreendeployments.flink.apache.org-v1.yml |  26 ++++
 17 files changed, 576 insertions(+), 32 deletions(-)

diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index 003b6495..98d18e7d 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -89,6 +89,7 @@ This serves as a full reference for FlinkDeployment and 
FlinkSessionJob custom r
 | Parameter | Type | Docs |
 | ----------| ---- | ---- |
 | configuration | java.util.Map<java.lang.String,java.lang.String> |  |
+| ingress | org.apache.flink.kubernetes.operator.api.spec.IngressSpec |  |
 | template | 
org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec |  |
 
 ### FlinkDeploymentSpec
diff --git a/e2e-tests/data/bluegreen-ingress.yaml 
b/e2e-tests/data/bluegreen-ingress.yaml
new file mode 100644
index 00000000..4a7b143e
--- /dev/null
+++ b/e2e-tests/data/bluegreen-ingress.yaml
@@ -0,0 +1,58 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkBlueGreenDeployment
+metadata:
+  name: bg-ingress-test
+spec:
+  configuration:
+    kubernetes.operator.bluegreen.deployment-deletion.delay: "2s"
+  # Parent-level ingress configuration
+  ingress:
+    template: "{{name}}.{{namespace}}.example.com"
+    className: "nginx"
+    annotations:
+      nginx.ingress.kubernetes.io/rewrite-target: "/"
+  template:
+    spec:
+      image: flink:1.20
+      flinkVersion: v1_20
+      flinkConfiguration:
+        rest.port: "8081"
+        taskmanager.numberOfTaskSlots: "1"
+      serviceAccount: flink
+      jobManager:
+        resource:
+          memory: 1G
+          cpu: 1
+      taskManager:
+        resource:
+          memory: 2G
+          cpu: 1
+      job:
+        jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+        parallelism: 1
+        entryClass: 
org.apache.flink.streaming.examples.statemachine.StateMachineExample
+        args:
+          - "--error-rate"
+          - "0.15"
+          - "--sleep"
+          - "30"
+        upgradeMode: stateless
+      mode: native
diff --git a/e2e-tests/test_bluegreen_ingress_rotation.sh 
b/e2e-tests/test_bluegreen_ingress_rotation.sh
new file mode 100755
index 00000000..6c5c6123
--- /dev/null
+++ b/e2e-tests/test_bluegreen_ingress_rotation.sh
@@ -0,0 +1,133 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This script tests the Blue/Green ingress rotation functionality:
+# - Create a FlinkBlueGreenDeployment with parent-level ingress spec
+# - Verify ingress is created and points to Blue deployment
+# - Trigger a transition to Green
+# - Verify ingress switches to point to Green deployment
+# - Verify Blue deployment is deleted but ingress remains
+# - Verify ingress configuration is preserved across transitions
+
+SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
+source "${SCRIPT_DIR}/utils.sh"
+
+CLUSTER_ID="bg-ingress-test"
+BG_CLUSTER_ID=$CLUSTER_ID
+BLUE_CLUSTER_ID=$CLUSTER_ID"-blue"
+GREEN_CLUSTER_ID=$CLUSTER_ID"-green"
+
+APPLICATION_YAML="${SCRIPT_DIR}/data/bluegreen-ingress.yaml"
+APPLICATION_IDENTIFIER="flinkbgdep/$CLUSTER_ID"
+BLUE_APPLICATION_IDENTIFIER="flinkdep/$BLUE_CLUSTER_ID"
+GREEN_APPLICATION_IDENTIFIER="flinkdep/$GREEN_CLUSTER_ID"
+TIMEOUT=300
+
+echo "Deploying BlueGreen deployment with ingress..."
+retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
+
+sleep 1
+wait_for_jobmanager_running $BLUE_CLUSTER_ID $TIMEOUT
+wait_for_status $BLUE_APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE 
${TIMEOUT} || exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING 
${TIMEOUT} || exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.blueGreenState' ACTIVE_BLUE 
${TIMEOUT} || exit 1
+
+echo "Verifying ingress created and points to Blue..."
+kubectl get ingress $BG_CLUSTER_ID -n default || exit 1
+
+# Check ingress backend points to Blue's REST service
+BLUE_BACKEND=$(kubectl get ingress $BG_CLUSTER_ID -n default -o 
jsonpath='{.spec.rules[0].http.paths[0].backend.service.name}')
+EXPECTED_BLUE_BACKEND="${BLUE_CLUSTER_ID}-rest"
+if [ "$BLUE_BACKEND" != "$EXPECTED_BLUE_BACKEND" ]; then
+    echo "ERROR: Ingress backend should be '$EXPECTED_BLUE_BACKEND' but got 
'$BLUE_BACKEND'"
+    exit 1
+fi
+echo " Ingress correctly points to Blue deployment"
+
+# Verify ingress annotations
+REWRITE_ANNOTATION=$(kubectl get ingress $BG_CLUSTER_ID -n default -o 
jsonpath='{.metadata.annotations.nginx\.ingress\.kubernetes\.io/rewrite-target}')
+if [ "$REWRITE_ANNOTATION" != "/" ]; then
+    echo "ERROR: Expected rewrite annotation '/' but got '$REWRITE_ANNOTATION'"
+    exit 1
+fi
+echo " Ingress annotations preserved"
+
+echo "Triggering Blue�Green transition..."
+kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch 
'{"spec":{"template":{"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"}}}}}'
+
+# Wait for Green to be ready
+wait_for_jobmanager_running $GREEN_CLUSTER_ID $TIMEOUT
+wait_for_status $GREEN_APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE 
${TIMEOUT} || exit 1
+
+echo "Waiting for Blue deletion..."
+kubectl wait --for=delete deployment --timeout=${TIMEOUT}s 
--selector="app=${BLUE_CLUSTER_ID}" || exit 1
+
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING 
${TIMEOUT} || exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.blueGreenState' ACTIVE_GREEN 
${TIMEOUT} || exit 1
+
+echo "Verifying ingress switched to Green..."
+# Ingress should still exist
+kubectl get ingress $BG_CLUSTER_ID -n default || exit 1
+
+# Check ingress backend now points to Green's REST service
+GREEN_BACKEND=$(kubectl get ingress $BG_CLUSTER_ID -n default -o 
jsonpath='{.spec.rules[0].http.paths[0].backend.service.name}')
+EXPECTED_GREEN_BACKEND="${GREEN_CLUSTER_ID}-rest"
+if [ "$GREEN_BACKEND" != "$EXPECTED_GREEN_BACKEND" ]; then
+    echo "ERROR: Ingress backend should be '$EXPECTED_GREEN_BACKEND' but got 
'$GREEN_BACKEND'"
+    exit 1
+fi
+echo " Ingress correctly switched to Green deployment"
+
+# Verify annotations still present after transition
+REWRITE_ANNOTATION=$(kubectl get ingress $BG_CLUSTER_ID -n default -o 
jsonpath='{.metadata.annotations.nginx\.ingress\.kubernetes\.io/rewrite-target}')
+if [ "$REWRITE_ANNOTATION" != "/" ]; then
+    echo "ERROR: Annotations lost during transition"
+    exit 1
+fi
+echo " Ingress configuration preserved across transition"
+
+echo "Triggering Green�Blue transition to verify bidirectional switching..."
+kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch 
'{"spec":{"template":{"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"1"}}}}}'
+
+wait_for_jobmanager_running $BLUE_CLUSTER_ID $TIMEOUT
+wait_for_status $BLUE_APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE 
${TIMEOUT} || exit 1
+kubectl wait --for=delete deployment --timeout=${TIMEOUT}s 
--selector="app=${GREEN_CLUSTER_ID}" || exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.blueGreenState' ACTIVE_BLUE 
${TIMEOUT} || exit 1
+
+echo "Verifying ingress switched back to Blue..."
+BLUE_BACKEND=$(kubectl get ingress $BG_CLUSTER_ID -n default -o 
jsonpath='{.spec.rules[0].http.paths[0].backend.service.name}')
+if [ "$BLUE_BACKEND" != "$EXPECTED_BLUE_BACKEND" ]; then
+    echo "ERROR: Ingress backend should be '$EXPECTED_BLUE_BACKEND' on return 
but got '$BLUE_BACKEND'"
+    exit 1
+fi
+echo " Ingress correctly switched back to Blue"
+
+echo "Cleaning up..."
+kubectl delete flinkbluegreendeployments/$BG_CLUSTER_ID &
+kubectl wait --for=delete flinkbluegreendeployments/$BG_CLUSTER_ID 
--timeout=${TIMEOUT}s
+
+# Verify ingress is deleted with the deployment
+INGRESS_DELETED=$(kubectl get ingress $BG_CLUSTER_ID -n default 2>&1 || echo 
"NotFound")
+if [[ ! "$INGRESS_DELETED" =~ "NotFound" ]]; then
+    echo "ERROR: Ingress should be deleted with BlueGreen deployment"
+    exit 1
+fi
+echo " Ingress cleaned up correctly"
+
+echo "Successfully run the Blue/Green ingress rotation test"
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java
index 704d3541..a515b0a9 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java
@@ -25,6 +25,8 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import javax.annotation.Nullable;
+
 import java.util.Map;
 
 /** Spec that describes a Flink application with blue/green deployment 
capabilities. */
@@ -38,5 +40,7 @@ public class FlinkBlueGreenDeploymentSpec {
     @JsonProperty("configuration")
     private Map<String, String> configuration;
 
+    @Nullable private IngressSpec ingress;
+
     private FlinkDeploymentTemplateSpec template;
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index fdc856f2..6bdcb8f0 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -261,8 +261,8 @@ public class FlinkOperator {
                 
MetricManager.createFlinkBlueGreenDeploymentMetricManager(baseConfig, 
metricGroup);
         var statusRecorder =
                 StatusRecorder.createForFlinkBlueGreenDeployment(client, 
metricManager, listeners);
-        var controller = new FlinkBlueGreenDeploymentController(ctxFactory, 
statusRecorder);
-
+        var controller =
+                new FlinkBlueGreenDeploymentController(ctxFactory, 
configManager, statusRecorder);
         registeredControllers.add(operator.register(controller, 
this::overrideControllerConfigs));
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
index bac6f131..a2257447 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
@@ -21,11 +21,13 @@ import 
org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
 import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
 import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService;
 import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenStateHandlerRegistry;
 import 
org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.BlueGreenStateHandler;
 import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import 
io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
@@ -71,15 +73,18 @@ public class FlinkBlueGreenDeploymentController implements 
Reconciler<FlinkBlueG
 
     private final FlinkResourceContextFactory ctxFactory;
     private final BlueGreenStateHandlerRegistry handlerRegistry;
+    private final FlinkConfigManager flinkConfigManager;
     private final StatusRecorder<FlinkBlueGreenDeployment, 
FlinkBlueGreenDeploymentStatus>
             statusRecorder;
 
     public FlinkBlueGreenDeploymentController(
             FlinkResourceContextFactory ctxFactory,
+            FlinkConfigManager flinkConfigManager,
             StatusRecorder<FlinkBlueGreenDeployment, 
FlinkBlueGreenDeploymentStatus>
                     statusRecorder) {
         this.ctxFactory = ctxFactory;
         this.handlerRegistry = new BlueGreenStateHandlerRegistry();
+        this.flinkConfigManager = flinkConfigManager;
         this.statusRecorder = statusRecorder;
     }
 
@@ -99,6 +104,9 @@ public class FlinkBlueGreenDeploymentController implements 
Reconciler<FlinkBlueG
 
         eventSources.add(new InformerEventSource<>(config, context));
 
+        if (flinkConfigManager.getOperatorConfiguration().isManageIngress()) {
+            
eventSources.add(EventSourceUtils.getBlueGreenIngressInformerEventSource(context));
+        }
         return eventSources;
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
index 33cd868b..9543feca 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import 
org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -733,12 +734,84 @@ public class BlueGreenDeploymentService {
         context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
         context.getDeploymentStatus().setSavepointTriggerId(null);
 
+        updateBlueGreenIngress(context, nextState);
+
         // Finalize status and reschedule immediately so any pending spec 
changes
         // (e.g., suspend requested during transition) are picked up on next 
reconcile
         return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING, 
null)
                 .rescheduleAfter(0);
     }
 
+    /**
+     * Reconciles ingress for the active deployment in ACTIVE states. This 
handles ingress spec
+     * changes that occur while the deployment is stable (not transitioning).
+     *
+     * @param context the Blue/Green context
+     * @param activeDeploymentType which deployment (BLUE or GREEN) is 
currently active
+     */
+    public void reconcileIngressForActiveDeployment(
+            BlueGreenContext context, BlueGreenDeploymentType 
activeDeploymentType) {
+        FlinkDeployment activeDeployment = 
context.getDeploymentByType(activeDeploymentType);
+        if (activeDeployment == null) {
+            return;
+        }
+
+        var flinkResourceContext =
+                context.getCtxFactory()
+                        .getResourceContext(activeDeployment, 
context.getJosdkContext());
+
+        if (!flinkResourceContext.getOperatorConfig().isManageIngress()) {
+            return;
+        }
+
+        IngressUtils.reconcileBlueGreenIngress(
+                context,
+                true,
+                activeDeployment,
+                
flinkResourceContext.getDeployConfig(activeDeployment.getSpec()),
+                context.getJosdkContext());
+
+        LOG.info(
+                "Successfully reconciled ingress for active deployment: {}",
+                activeDeployment.getMetadata().getName());
+    }
+
+    /**
+     * Updates the ingress for Blue/Green deployment during transitions, 
pointing to the newly
+     * active deployment.
+     *
+     * @param blueGreenContext the Blue/Green context
+     * @param nextState which deployment (ACTIVE_BLUE or ACTIVE_GREEN) is 
becoming active
+     */
+    public void updateBlueGreenIngress(
+            BlueGreenContext blueGreenContext, FlinkBlueGreenDeploymentState 
nextState) {
+        FlinkDeployment activeDeployment;
+        switch (nextState) {
+            case ACTIVE_BLUE:
+                activeDeployment = blueGreenContext.getBlueDeployment();
+                break;
+            case ACTIVE_GREEN:
+                activeDeployment = blueGreenContext.getGreenDeployment();
+                break;
+            default:
+                LOG.info("Skipping ingress reconciliation for non-active 
state: {}", nextState);
+                return;
+        }
+
+        // Create a FlinkResourceContext for the active deployment to get 
proper config
+        var flinkResourceContext =
+                blueGreenContext
+                        .getCtxFactory()
+                        .getResourceContext(activeDeployment, 
blueGreenContext.getJosdkContext());
+
+        IngressUtils.reconcileBlueGreenIngress(
+                blueGreenContext,
+                flinkResourceContext.getOperatorConfig().isManageIngress(),
+                activeDeployment,
+                
flinkResourceContext.getDeployConfig(activeDeployment.getSpec()),
+                blueGreenContext.getJosdkContext());
+    }
+
     // ==================== Common Utility Methods ====================
 
     public static UpdateControl<FlinkBlueGreenDeployment> 
patchStatusUpdateControl(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/ActiveStateHandler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/ActiveStateHandler.java
index 176b94fb..06deadc3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/ActiveStateHandler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/ActiveStateHandler.java
@@ -37,6 +37,10 @@ public class ActiveStateHandler extends 
AbstractBlueGreenStateHandler {
     @Override
     public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext 
context) {
         BlueGreenDeploymentType currentType = getCurrentDeploymentType();
+
+        // Reconcile ingress for the active deployment (handles spec changes)
+        deploymentService.reconcileIngressForActiveDeployment(context, 
currentType);
+
         return deploymentService.checkAndInitiateDeployment(context, 
currentType);
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
index efcf2ab6..e3f22d47 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
@@ -129,6 +130,29 @@ public class EventSourceUtils {
         return new InformerEventSource<>(configuration, context);
     }
 
+    public static InformerEventSource<?, FlinkBlueGreenDeployment>
+            getBlueGreenIngressInformerEventSource(
+                    EventSourceContext<FlinkBlueGreenDeployment> context) {
+        final String labelSelector =
+                Map.of(Constants.LABEL_COMPONENT_KEY, 
LABEL_COMPONENT_INGRESS).entrySet().stream()
+                        .map(Object::toString)
+                        .collect(Collectors.joining(","));
+
+        var ingressClass =
+                ingressInNetworkingV1(context.getClient())
+                        ? Ingress.class
+                        : 
io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress.class;
+
+        var configuration =
+                InformerEventSourceConfiguration.from(ingressClass, 
FlinkBlueGreenDeployment.class)
+                        .withLabelSelector(labelSelector)
+                        .withNamespacesInheritedFromController()
+                        .withFollowControllerNamespacesChanges(true)
+                        .build();
+
+        return new InformerEventSource<>(configuration, context);
+    }
+
     public static InformerEventSource<FlinkSessionJob, FlinkDeployment>
             
getSessionJobInformerEventSource(EventSourceContext<FlinkDeployment> context) {
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
index 722270a1..74339578 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
@@ -19,8 +19,13 @@ package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import 
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.util.Preconditions;
@@ -38,6 +43,7 @@ import 
io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress;
 import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressTLS;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.dsl.NonDeletingOperation;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -95,7 +101,8 @@ public class IngressUtils {
         }
         var objectMeta = ctx.getResource().getMetadata();
         if (spec.getIngress() != null) {
-            HasMetadata ingress = getIngress(objectMeta, spec, 
effectiveConfig, client);
+            HasMetadata ingress =
+                    getIngress(objectMeta, spec.getIngress(), effectiveConfig, 
client);
             setOwnerReference(ctx.getResource(), 
Collections.singletonList(ingress));
             LOG.info("Updating ingress rules {}", ingress);
             client.resource(ingress)
@@ -116,33 +123,84 @@ public class IngressUtils {
         }
     }
 
+    public static void reconcileBlueGreenIngress(
+            BlueGreenContext context,
+            boolean operatorManagedIngress,
+            FlinkDeployment targetFlinkDeployment,
+            Configuration effectiveConfig,
+            Context<FlinkBlueGreenDeployment> client) {
+        if (!operatorManagedIngress) {
+            LOG.info("Skipping rerouting of of Blue Green Ingress, not 
operator managed ingress ");
+            return;
+        }
+        FlinkBlueGreenDeploymentSpec flinkBlueGreenDeploymentSpec =
+                context.getBgDeployment().getSpec();
+        var objectMeta = context.getBgDeployment().getMetadata();
+        if (flinkBlueGreenDeploymentSpec.getIngress() != null) {
+            HasMetadata ingress =
+                    getIngress(
+                            objectMeta,
+                            flinkBlueGreenDeploymentSpec.getIngress(),
+                            effectiveConfig,
+                            client.getClient(),
+                            targetFlinkDeployment.getMetadata().getName() + 
REST_SVC_NAME_SUFFIX);
+            setOwnerReference(context.getBgDeployment(), 
Collections.singletonList(ingress));
+            client.getClient()
+                    .resource(ingress)
+                    .inNamespace(objectMeta.getNamespace())
+                    .createOr(NonDeletingOperation::update);
+        } else {
+            Optional<? extends HasMetadata> ingress;
+            if (ingressInNetworkingV1(client.getClient())) {
+                ingress =
+                        client.getSecondaryResource(
+                                
io.fabric8.kubernetes.api.model.networking.v1.Ingress.class);
+            } else {
+                ingress = client.getSecondaryResource(Ingress.class);
+            }
+            ingress.ifPresent(i -> client.getClient().resource(i).delete());
+        }
+    }
+
     private static HasMetadata getIngress(
             ObjectMeta objectMeta,
-            FlinkDeploymentSpec spec,
+            IngressSpec spec,
             Configuration effectiveConfig,
             KubernetesClient client) {
+        return getIngress(
+                objectMeta,
+                spec,
+                effectiveConfig,
+                client,
+                objectMeta.getName() + REST_SVC_NAME_SUFFIX);
+    }
+
+    private static HasMetadata getIngress(
+            ObjectMeta objectMeta,
+            IngressSpec spec,
+            Configuration effectiveConfig,
+            KubernetesClient client,
+            String serviceName) {
         Map<String, String> labels =
-                spec.getIngress().getLabels() == null
-                        ? new HashMap<>()
-                        : new HashMap<>(spec.getIngress().getLabels());
+                spec.getLabels() == null ? new HashMap<>() : new 
HashMap<>(spec.getLabels());
         labels.put(Constants.LABEL_COMPONENT_KEY, LABEL_COMPONENT_INGRESS);
         if (ingressInNetworkingV1(client)) {
             return new IngressBuilder()
                     .withNewMetadata()
                     .withLabels(labels)
-                    .withAnnotations(spec.getIngress().getAnnotations())
+                    .withAnnotations(spec.getAnnotations())
                     .withName(objectMeta.getName())
                     .withNamespace(objectMeta.getNamespace())
                     .endMetadata()
                     .withNewSpec()
-                    .withIngressClassName(spec.getIngress().getClassName())
-                    .withTls(spec.getIngress().getTls())
-                    .withRules(getIngressRule(objectMeta, spec, 
effectiveConfig))
+                    .withIngressClassName(spec.getClassName())
+                    .withTls(spec.getTls())
+                    .withRules(getIngressRule(objectMeta, spec, 
effectiveConfig, serviceName))
                     .endSpec()
                     .build();
         } else {
             List<IngressTLS> ingressTLS =
-                    Optional.ofNullable(spec.getIngress().getTls())
+                    Optional.ofNullable(spec.getTls())
                             .map(
                                     list ->
                                             list.stream()
@@ -160,30 +218,31 @@ public class IngressUtils {
                             .orElse(Collections.emptyList());
             return new 
io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder()
                     .withNewMetadata()
-                    .withAnnotations(spec.getIngress().getAnnotations())
+                    .withAnnotations(spec.getAnnotations())
                     .withLabels(labels)
                     .withName(objectMeta.getName())
                     .withNamespace(objectMeta.getNamespace())
                     .endMetadata()
                     .withNewSpec()
-                    .withIngressClassName(spec.getIngress().getClassName())
+                    .withIngressClassName(spec.getClassName())
                     .withTls(ingressTLS)
-                    .withRules(getIngressRuleForV1beta1(objectMeta, spec, 
effectiveConfig))
+                    .withRules(
+                            getIngressRuleForV1beta1(
+                                    objectMeta, spec, effectiveConfig, 
serviceName))
                     .endSpec()
                     .build();
         }
     }
 
     private static IngressRule getIngressRule(
-            ObjectMeta objectMeta, FlinkDeploymentSpec spec, Configuration 
effectiveConfig) {
-        final String clusterId = objectMeta.getName();
+            ObjectMeta objectMeta,
+            IngressSpec spec,
+            Configuration effectiveConfig,
+            String serviceName) {
         final int restPort = effectiveConfig.getInteger(RestOptions.PORT);
 
         URL ingressUrl =
-                getIngressUrl(
-                        spec.getIngress().getTemplate(),
-                        objectMeta.getName(),
-                        objectMeta.getNamespace());
+                getIngressUrl(spec.getTemplate(), objectMeta.getName(), 
objectMeta.getNamespace());
 
         IngressRuleBuilder ingressRuleBuilder = new IngressRuleBuilder();
         ingressRuleBuilder.withHttp(
@@ -192,7 +251,7 @@ public class IngressUtils {
                         .withPathType("ImplementationSpecific")
                         .withNewBackend()
                         .withNewService()
-                        .withName(clusterId + REST_SVC_NAME_SUFFIX)
+                        .withName(serviceName)
                         .withNewPort()
                         .withNumber(restPort)
                         .endPort()
@@ -219,16 +278,13 @@ public class IngressUtils {
     private static 
io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRule
             getIngressRuleForV1beta1(
                     ObjectMeta objectMeta,
-                    FlinkDeploymentSpec spec,
-                    Configuration effectiveConfig) {
-        final String clusterId = objectMeta.getName();
+                    IngressSpec spec,
+                    Configuration effectiveConfig,
+                    String serviceName) {
         final int restPort = effectiveConfig.getInteger(RestOptions.PORT);
 
         URL ingressUrl =
-                getIngressUrl(
-                        spec.getIngress().getTemplate(),
-                        objectMeta.getName(),
-                        objectMeta.getNamespace());
+                getIngressUrl(spec.getTemplate(), objectMeta.getName(), 
objectMeta.getNamespace());
 
         io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRuleBuilder 
ingressRuleBuilder =
                 new 
io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRuleBuilder();
@@ -236,7 +292,7 @@ public class IngressUtils {
                 new 
io.fabric8.kubernetes.api.model.networking.v1beta1.HTTPIngressRuleValueBuilder()
                         .addNewPath()
                         .withNewBackend()
-                        .withServiceName(clusterId + REST_SVC_NAME_SUFFIX)
+                        .withServiceName(serviceName)
                         .withServicePort(new IntOrString(restPort))
                         .endBackend()
                         .endPath()
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
index 27195908..af3ccbd6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import 
org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
 import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDiffType;
 import 
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
@@ -365,6 +366,13 @@ public class BlueGreenUtils {
 
         flinkDeployment.setSpec(spec.getTemplate().getSpec());
 
+        // Update Ingress template if exists to prevent path collision between 
Blue and Green
+        IngressSpec ingress = flinkDeployment.getSpec().getIngress();
+        if (ingress != null) {
+            ingress.setTemplate(
+                    blueGreenDeploymentType.name().toLowerCase() + "-" + 
ingress.getTemplate());
+        }
+
         // Deployment metadata
         ObjectMeta flinkDeploymentMeta = 
getDependentObjectMeta(context.getBgDeployment());
         flinkDeploymentMeta.setName(childDeploymentName);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
index bc571c5d..528b6172 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpe
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
 import 
org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
@@ -40,9 +41,11 @@ import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentS
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
 import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -90,6 +93,9 @@ public class FlinkBlueGreenDeploymentControllerTest {
     private static final int MINIMUM_ABORT_GRACE_PERIOD = 1000;
     private static final String TEST_CHECKPOINT_PATH = "/tmp/checkpoints";
     private static final String TEST_INITIAL_SAVEPOINT_PATH = 
"/tmp/savepoints";
+    private static final String BLUE_CLUSTER_ID = TEST_DEPLOYMENT_NAME + 
"-blue";
+    private static final String GREEN_CLUSTER_ID = TEST_DEPLOYMENT_NAME + 
"-green";
+    private static final String REST_SVC_NAME_SUFFIX = "-rest";
     private final FlinkConfigManager configManager = new 
FlinkConfigManager(new Configuration());
     private TestingFlinkService flinkService;
     private Context<FlinkBlueGreenDeployment> context;
@@ -1531,6 +1537,145 @@ public class FlinkBlueGreenDeploymentControllerTest {
         var flinkDeploymentTemplateSpec =
                 
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build();
 
-        return new FlinkBlueGreenDeploymentSpec(configuration, 
flinkDeploymentTemplateSpec);
+        return new FlinkBlueGreenDeploymentSpec(configuration, null, 
flinkDeploymentTemplateSpec);
+    }
+
+    // ==================== Ingress Helper Methods ====================
+
+    private void assertIngressPointsToService(String expectedServiceName) {
+        if (IngressUtils.ingressInNetworkingV1(kubernetesClient)) {
+            Ingress ingress = getIngressV1(TEST_DEPLOYMENT_NAME, 
TEST_NAMESPACE);
+            assertNotNull(ingress);
+            assertEquals(
+                    expectedServiceName,
+                    ingress.getSpec()
+                            .getRules()
+                            .get(0)
+                            .getHttp()
+                            .getPaths()
+                            .get(0)
+                            .getBackend()
+                            .getService()
+                            .getName());
+        } else {
+            io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress 
ingressV1beta1 =
+                    getIngressV1beta1(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE);
+            assertNotNull(ingressV1beta1);
+            assertEquals(
+                    expectedServiceName,
+                    ingressV1beta1
+                            .getSpec()
+                            .getRules()
+                            .get(0)
+                            .getHttp()
+                            .getPaths()
+                            .get(0)
+                            .getBackend()
+                            .getServiceName());
+        }
+    }
+
+    private void assertIngressDoesNotExist() {
+        if (IngressUtils.ingressInNetworkingV1(kubernetesClient)) {
+            assertNull(getIngressV1(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE));
+        } else {
+            assertNull(getIngressV1beta1(TEST_DEPLOYMENT_NAME, 
TEST_NAMESPACE));
+        }
+    }
+
+    private Ingress getIngressV1(String name, String namespace) {
+        return kubernetesClient
+                .network()
+                .v1()
+                .ingresses()
+                .inNamespace(namespace)
+                .withName(name)
+                .get();
+    }
+
+    private io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress 
getIngressV1beta1(
+            String name, String namespace) {
+        return kubernetesClient
+                .network()
+                .v1beta1()
+                .ingresses()
+                .inNamespace(namespace)
+                .withName(name)
+                .get();
+    }
+
+    // ==================== Ingress Rotation Tests ====================
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+    public void verifyIngressSwitchesDuringTransition(FlinkVersion 
flinkVersion) throws Exception {
+        // Build deployment with ingress spec
+        var blueGreenDeployment =
+                buildSessionCluster(
+                        TEST_DEPLOYMENT_NAME,
+                        TEST_NAMESPACE,
+                        flinkVersion,
+                        null,
+                        UpgradeMode.STATELESS);
+
+        IngressSpec ingressSpec =
+                IngressSpec.builder()
+                        .template("{{name}}.{{namespace}}.example.com")
+                        .className("nginx")
+                        
.annotations(Map.of("nginx.ingress.kubernetes.io/rewrite-target", "/"))
+                        .build();
+        blueGreenDeployment.getSpec().setIngress(ingressSpec);
+
+        // 1. Deploy Blue with ingress
+        var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, 
false, null);
+
+        // 2. Verify ingress points to Blue
+        assertIngressPointsToService(BLUE_CLUSTER_ID + REST_SVC_NAME_SUFFIX);
+
+        // 3. Trigger transition to Green
+        String customValue = UUID.randomUUID().toString();
+        simulateChangeInSpec(rs.deployment, customValue, 
ALT_DELETION_DELAY_VALUE, null);
+
+        // Transition to Green
+        testTransitionToGreen(rs, customValue, null);
+
+        // 4. Verify ingress now points to Green
+        assertIngressPointsToService(GREEN_CLUSTER_ID + REST_SVC_NAME_SUFFIX);
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+    public void verifyIngressCreatedOnlyWhenConfigured(FlinkVersion 
flinkVersion) throws Exception {
+        // 1. Deploy Blue without ingress spec
+        var blueGreenDeployment =
+                buildSessionCluster(
+                        TEST_DEPLOYMENT_NAME,
+                        TEST_NAMESPACE,
+                        flinkVersion,
+                        null,
+                        UpgradeMode.STATELESS);
+
+        var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, 
false, null);
+
+        // 2. Verify no ingress created initially
+        assertIngressDoesNotExist();
+
+        // 3. Add ingress spec and trigger transition to Green
+        IngressSpec ingressSpec =
+                IngressSpec.builder()
+                        .template("{{name}}.{{namespace}}.example.com")
+                        .className("nginx")
+                        .build();
+        rs.deployment.getSpec().setIngress(ingressSpec);
+        kubernetesClient.resource(rs.deployment).createOrReplace();
+
+        String customValue = UUID.randomUUID().toString();
+        simulateChangeInSpec(rs.deployment, customValue, 
ALT_DELETION_DELAY_VALUE, null);
+
+        // Complete transition to Green (this reconciles ingress)
+        testTransitionToGreen(rs, customValue, null);
+
+        // 4. Verify ingress created and points to Green after transition 
completes
+        assertIngressPointsToService(GREEN_CLUSTER_ID + REST_SVC_NAME_SUFFIX);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
index 5bddcf4d..da7bedea 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
@@ -56,7 +56,8 @@ public class TestingFlinkBlueGreenDeploymentController
                 new StatusRecorder<>(new MetricManager<>(), (resource, status) 
-> {});
 
         flinkBlueGreenDeploymentController =
-                new FlinkBlueGreenDeploymentController(contextFactory, 
statusRecorder);
+                new FlinkBlueGreenDeploymentController(
+                        contextFactory, configManager, statusRecorder);
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
index 10469677..76ff973a 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
@@ -466,6 +466,7 @@ public class FlinkBlueGreenDeploymentMetricsTest {
         var bgDeploymentSpec =
                 new FlinkBlueGreenDeploymentSpec(
                         new HashMap<>(),
+                        null,
                         
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build());
 
         deployment.setSpec(bgDeploymentSpec);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java
index ce0af71c..79add57f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java
@@ -179,6 +179,7 @@ public class BlueGreenLifecycleMetricsTest {
         var bgDeploymentSpec =
                 new FlinkBlueGreenDeploymentSpec(
                         new HashMap<>(),
+                        null,
                         
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build());
 
         deployment.setSpec(bgDeploymentSpec);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
index a898662e..d7e73510 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
@@ -243,6 +243,7 @@ public class BlueGreenUtilsTest {
         var bgDeploymentSpec =
                 new FlinkBlueGreenDeploymentSpec(
                         new HashMap<>(),
+                        null,
                         
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build());
 
         deployment.setSpec(bgDeploymentSpec);
diff --git 
a/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml
 
b/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml
index d31d14aa..0f8ef636 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml
@@ -24,6 +24,32 @@ spec:
         properties:
           spec:
             properties:
+              ingress:
+                properties:
+                  annotations:
+                    additionalProperties:
+                      type: string
+                    type: object
+                  className:
+                    type: string
+                  labels:
+                    additionalProperties:
+                      type: string
+                    type: object
+                  template:
+                    type: string
+                  tls:
+                    items:
+                      properties:
+                        hosts:
+                          items:
+                            type: string
+                          type: array
+                        secretName:
+                          type: string
+                      type: object
+                    type: array
+                type: object
               configuration:
                 additionalProperties:
                   type: "string"

Reply via email to