This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new a31aeac0 [FLINK-38577] Added BlueGreen ingress that switches between
active Svc + resolve path conflict on Blue and Green deployment ingresses
a31aeac0 is described below
commit a31aeac050ad3835da3ffbfbf2d2e9e53ce8afae
Author: Daniel Rossos <[email protected]>
AuthorDate: Mon Feb 2 02:28:13 2026 -0500
[FLINK-38577] Added BlueGreen ingress that switches between active Svc +
resolve path conflict on Blue and Green deployment ingresses
---
docs/content/docs/custom-resource/reference.md | 1 +
e2e-tests/data/bluegreen-ingress.yaml | 58 ++++++++
e2e-tests/test_bluegreen_ingress_rotation.sh | 133 +++++++++++++++++++
.../api/spec/FlinkBlueGreenDeploymentSpec.java | 4 +
.../flink/kubernetes/operator/FlinkOperator.java | 4 +-
.../FlinkBlueGreenDeploymentController.java | 8 ++
.../bluegreen/BlueGreenDeploymentService.java | 73 ++++++++++
.../bluegreen/handlers/ActiveStateHandler.java | 4 +
.../operator/utils/EventSourceUtils.java | 24 ++++
.../kubernetes/operator/utils/IngressUtils.java | 112 ++++++++++++----
.../operator/utils/bluegreen/BlueGreenUtils.java | 8 ++
.../FlinkBlueGreenDeploymentControllerTest.java | 147 ++++++++++++++++++++-
.../TestingFlinkBlueGreenDeploymentController.java | 3 +-
.../FlinkBlueGreenDeploymentMetricsTest.java | 1 +
.../lifecycle/BlueGreenLifecycleMetricsTest.java | 1 +
.../utils/bluegreen/BlueGreenUtilsTest.java | 1 +
...inkbluegreendeployments.flink.apache.org-v1.yml | 26 ++++
17 files changed, 576 insertions(+), 32 deletions(-)
diff --git a/docs/content/docs/custom-resource/reference.md
b/docs/content/docs/custom-resource/reference.md
index 003b6495..98d18e7d 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -89,6 +89,7 @@ This serves as a full reference for FlinkDeployment and
FlinkSessionJob custom r
| Parameter | Type | Docs |
| ----------| ---- | ---- |
| configuration | java.util.Map<java.lang.String,java.lang.String> | |
+| ingress | org.apache.flink.kubernetes.operator.api.spec.IngressSpec | |
| template |
org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec | |
### FlinkDeploymentSpec
diff --git a/e2e-tests/data/bluegreen-ingress.yaml
b/e2e-tests/data/bluegreen-ingress.yaml
new file mode 100644
index 00000000..4a7b143e
--- /dev/null
+++ b/e2e-tests/data/bluegreen-ingress.yaml
@@ -0,0 +1,58 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkBlueGreenDeployment
+metadata:
+ name: bg-ingress-test
+spec:
+ configuration:
+ kubernetes.operator.bluegreen.deployment-deletion.delay: "2s"
+ # Parent-level ingress configuration
+ ingress:
+ template: "{{name}}.{{namespace}}.example.com"
+ className: "nginx"
+ annotations:
+ nginx.ingress.kubernetes.io/rewrite-target: "/"
+ template:
+ spec:
+ image: flink:1.20
+ flinkVersion: v1_20
+ flinkConfiguration:
+ rest.port: "8081"
+ taskmanager.numberOfTaskSlots: "1"
+ serviceAccount: flink
+ jobManager:
+ resource:
+ memory: 1G
+ cpu: 1
+ taskManager:
+ resource:
+ memory: 2G
+ cpu: 1
+ job:
+ jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+ parallelism: 1
+ entryClass:
org.apache.flink.streaming.examples.statemachine.StateMachineExample
+ args:
+ - "--error-rate"
+ - "0.15"
+ - "--sleep"
+ - "30"
+ upgradeMode: stateless
+ mode: native
diff --git a/e2e-tests/test_bluegreen_ingress_rotation.sh
b/e2e-tests/test_bluegreen_ingress_rotation.sh
new file mode 100755
index 00000000..6c5c6123
--- /dev/null
+++ b/e2e-tests/test_bluegreen_ingress_rotation.sh
@@ -0,0 +1,133 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This script tests the Blue/Green ingress rotation functionality:
+# - Create a FlinkBlueGreenDeployment with parent-level ingress spec
+# - Verify ingress is created and points to Blue deployment
+# - Trigger a transition to Green
+# - Verify ingress switches to point to Green deployment
+# - Verify Blue deployment is deleted but ingress remains
+# - Verify ingress configuration is preserved across transitions
+
+SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
+source "${SCRIPT_DIR}/utils.sh"
+
+CLUSTER_ID="bg-ingress-test"
+BG_CLUSTER_ID=$CLUSTER_ID
+BLUE_CLUSTER_ID=$CLUSTER_ID"-blue"
+GREEN_CLUSTER_ID=$CLUSTER_ID"-green"
+
+APPLICATION_YAML="${SCRIPT_DIR}/data/bluegreen-ingress.yaml"
+APPLICATION_IDENTIFIER="flinkbgdep/$CLUSTER_ID"
+BLUE_APPLICATION_IDENTIFIER="flinkdep/$BLUE_CLUSTER_ID"
+GREEN_APPLICATION_IDENTIFIER="flinkdep/$GREEN_CLUSTER_ID"
+TIMEOUT=300
+
+echo "Deploying BlueGreen deployment with ingress..."
+retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
+
+sleep 1
+wait_for_jobmanager_running $BLUE_CLUSTER_ID $TIMEOUT
+wait_for_status $BLUE_APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE
${TIMEOUT} || exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING
${TIMEOUT} || exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.blueGreenState' ACTIVE_BLUE
${TIMEOUT} || exit 1
+
+echo "Verifying ingress created and points to Blue..."
+kubectl get ingress $BG_CLUSTER_ID -n default || exit 1
+
+# Check ingress backend points to Blue's REST service
+BLUE_BACKEND=$(kubectl get ingress $BG_CLUSTER_ID -n default -o
jsonpath='{.spec.rules[0].http.paths[0].backend.service.name}')
+EXPECTED_BLUE_BACKEND="${BLUE_CLUSTER_ID}-rest"
+if [ "$BLUE_BACKEND" != "$EXPECTED_BLUE_BACKEND" ]; then
+ echo "ERROR: Ingress backend should be '$EXPECTED_BLUE_BACKEND' but got
'$BLUE_BACKEND'"
+ exit 1
+fi
+echo " Ingress correctly points to Blue deployment"
+
+# Verify ingress annotations
+REWRITE_ANNOTATION=$(kubectl get ingress $BG_CLUSTER_ID -n default -o
jsonpath='{.metadata.annotations.nginx\.ingress\.kubernetes\.io/rewrite-target}')
+if [ "$REWRITE_ANNOTATION" != "/" ]; then
+ echo "ERROR: Expected rewrite annotation '/' but got '$REWRITE_ANNOTATION'"
+ exit 1
+fi
+echo " Ingress annotations preserved"
+
+echo "Triggering Blue�Green transition..."
+kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch
'{"spec":{"template":{"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"}}}}}'
+
+# Wait for Green to be ready
+wait_for_jobmanager_running $GREEN_CLUSTER_ID $TIMEOUT
+wait_for_status $GREEN_APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE
${TIMEOUT} || exit 1
+
+echo "Waiting for Blue deletion..."
+kubectl wait --for=delete deployment --timeout=${TIMEOUT}s
--selector="app=${BLUE_CLUSTER_ID}" || exit 1
+
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING
${TIMEOUT} || exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.blueGreenState' ACTIVE_GREEN
${TIMEOUT} || exit 1
+
+echo "Verifying ingress switched to Green..."
+# Ingress should still exist
+kubectl get ingress $BG_CLUSTER_ID -n default || exit 1
+
+# Check ingress backend now points to Green's REST service
+GREEN_BACKEND=$(kubectl get ingress $BG_CLUSTER_ID -n default -o
jsonpath='{.spec.rules[0].http.paths[0].backend.service.name}')
+EXPECTED_GREEN_BACKEND="${GREEN_CLUSTER_ID}-rest"
+if [ "$GREEN_BACKEND" != "$EXPECTED_GREEN_BACKEND" ]; then
+ echo "ERROR: Ingress backend should be '$EXPECTED_GREEN_BACKEND' but got
'$GREEN_BACKEND'"
+ exit 1
+fi
+echo " Ingress correctly switched to Green deployment"
+
+# Verify annotations still present after transition
+REWRITE_ANNOTATION=$(kubectl get ingress $BG_CLUSTER_ID -n default -o
jsonpath='{.metadata.annotations.nginx\.ingress\.kubernetes\.io/rewrite-target}')
+if [ "$REWRITE_ANNOTATION" != "/" ]; then
+ echo "ERROR: Annotations lost during transition"
+ exit 1
+fi
+echo " Ingress configuration preserved across transition"
+
+echo "Triggering Green�Blue transition to verify bidirectional switching..."
+kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch
'{"spec":{"template":{"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"1"}}}}}'
+
+wait_for_jobmanager_running $BLUE_CLUSTER_ID $TIMEOUT
+wait_for_status $BLUE_APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE
${TIMEOUT} || exit 1
+kubectl wait --for=delete deployment --timeout=${TIMEOUT}s
--selector="app=${GREEN_CLUSTER_ID}" || exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.blueGreenState' ACTIVE_BLUE
${TIMEOUT} || exit 1
+
+echo "Verifying ingress switched back to Blue..."
+BLUE_BACKEND=$(kubectl get ingress $BG_CLUSTER_ID -n default -o
jsonpath='{.spec.rules[0].http.paths[0].backend.service.name}')
+if [ "$BLUE_BACKEND" != "$EXPECTED_BLUE_BACKEND" ]; then
+ echo "ERROR: Ingress backend should be '$EXPECTED_BLUE_BACKEND' on return
but got '$BLUE_BACKEND'"
+ exit 1
+fi
+echo " Ingress correctly switched back to Blue"
+
+echo "Cleaning up..."
+kubectl delete flinkbluegreendeployments/$BG_CLUSTER_ID &
+kubectl wait --for=delete flinkbluegreendeployments/$BG_CLUSTER_ID
--timeout=${TIMEOUT}s
+
+# Verify ingress is deleted with the deployment
+INGRESS_DELETED=$(kubectl get ingress $BG_CLUSTER_ID -n default 2>&1 || echo
"NotFound")
+if [[ ! "$INGRESS_DELETED" =~ "NotFound" ]]; then
+ echo "ERROR: Ingress should be deleted with BlueGreen deployment"
+ exit 1
+fi
+echo " Ingress cleaned up correctly"
+
+echo "Successfully run the Blue/Green ingress rotation test"
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java
index 704d3541..a515b0a9 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java
@@ -25,6 +25,8 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import javax.annotation.Nullable;
+
import java.util.Map;
/** Spec that describes a Flink application with blue/green deployment
capabilities. */
@@ -38,5 +40,7 @@ public class FlinkBlueGreenDeploymentSpec {
@JsonProperty("configuration")
private Map<String, String> configuration;
+ @Nullable private IngressSpec ingress;
+
private FlinkDeploymentTemplateSpec template;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index fdc856f2..6bdcb8f0 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -261,8 +261,8 @@ public class FlinkOperator {
MetricManager.createFlinkBlueGreenDeploymentMetricManager(baseConfig,
metricGroup);
var statusRecorder =
StatusRecorder.createForFlinkBlueGreenDeployment(client,
metricManager, listeners);
- var controller = new FlinkBlueGreenDeploymentController(ctxFactory,
statusRecorder);
-
+ var controller =
+ new FlinkBlueGreenDeploymentController(ctxFactory,
configManager, statusRecorder);
registeredControllers.add(operator.register(controller,
this::overrideControllerConfigs));
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
index bac6f131..a2257447 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
@@ -21,11 +21,13 @@ import
org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
import
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService;
import
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenStateHandlerRegistry;
import
org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.BlueGreenStateHandler;
import
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import
io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
@@ -71,15 +73,18 @@ public class FlinkBlueGreenDeploymentController implements
Reconciler<FlinkBlueG
private final FlinkResourceContextFactory ctxFactory;
private final BlueGreenStateHandlerRegistry handlerRegistry;
+ private final FlinkConfigManager flinkConfigManager;
private final StatusRecorder<FlinkBlueGreenDeployment,
FlinkBlueGreenDeploymentStatus>
statusRecorder;
public FlinkBlueGreenDeploymentController(
FlinkResourceContextFactory ctxFactory,
+ FlinkConfigManager flinkConfigManager,
StatusRecorder<FlinkBlueGreenDeployment,
FlinkBlueGreenDeploymentStatus>
statusRecorder) {
this.ctxFactory = ctxFactory;
this.handlerRegistry = new BlueGreenStateHandlerRegistry();
+ this.flinkConfigManager = flinkConfigManager;
this.statusRecorder = statusRecorder;
}
@@ -99,6 +104,9 @@ public class FlinkBlueGreenDeploymentController implements
Reconciler<FlinkBlueG
eventSources.add(new InformerEventSource<>(config, context));
+ if (flinkConfigManager.getOperatorConfiguration().isManageIngress()) {
+
eventSources.add(EventSourceUtils.getBlueGreenIngressInformerEventSource(context));
+ }
return eventSources;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
index 33cd868b..9543feca 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
@@ -30,6 +30,7 @@ import
org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import
org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils;
import org.apache.flink.util.Preconditions;
@@ -733,12 +734,84 @@ public class BlueGreenDeploymentService {
context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
context.getDeploymentStatus().setSavepointTriggerId(null);
+ updateBlueGreenIngress(context, nextState);
+
// Finalize status and reschedule immediately so any pending spec
changes
// (e.g., suspend requested during transition) are picked up on next
reconcile
return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING,
null)
.rescheduleAfter(0);
}
+ /**
+ * Reconciles ingress for the active deployment in ACTIVE states. This
handles ingress spec
+ * changes that occur while the deployment is stable (not transitioning).
+ *
+ * @param context the Blue/Green context
+ * @param activeDeploymentType which deployment (BLUE or GREEN) is
currently active
+ */
+ public void reconcileIngressForActiveDeployment(
+ BlueGreenContext context, BlueGreenDeploymentType
activeDeploymentType) {
+ FlinkDeployment activeDeployment =
context.getDeploymentByType(activeDeploymentType);
+ if (activeDeployment == null) {
+ return;
+ }
+
+ var flinkResourceContext =
+ context.getCtxFactory()
+ .getResourceContext(activeDeployment,
context.getJosdkContext());
+
+ if (!flinkResourceContext.getOperatorConfig().isManageIngress()) {
+ return;
+ }
+
+ IngressUtils.reconcileBlueGreenIngress(
+ context,
+ true,
+ activeDeployment,
+
flinkResourceContext.getDeployConfig(activeDeployment.getSpec()),
+ context.getJosdkContext());
+
+ LOG.info(
+ "Successfully reconciled ingress for active deployment: {}",
+ activeDeployment.getMetadata().getName());
+ }
+
+ /**
+ * Updates the ingress for Blue/Green deployment during transitions,
pointing to the newly
+ * active deployment.
+ *
+ * @param blueGreenContext the Blue/Green context
+ * @param nextState which deployment (ACTIVE_BLUE or ACTIVE_GREEN) is
becoming active
+ */
+ public void updateBlueGreenIngress(
+ BlueGreenContext blueGreenContext, FlinkBlueGreenDeploymentState
nextState) {
+ FlinkDeployment activeDeployment;
+ switch (nextState) {
+ case ACTIVE_BLUE:
+ activeDeployment = blueGreenContext.getBlueDeployment();
+ break;
+ case ACTIVE_GREEN:
+ activeDeployment = blueGreenContext.getGreenDeployment();
+ break;
+ default:
+ LOG.info("Skipping ingress reconciliation for non-active
state: {}", nextState);
+ return;
+ }
+
+ // Create a FlinkResourceContext for the active deployment to get
proper config
+ var flinkResourceContext =
+ blueGreenContext
+ .getCtxFactory()
+ .getResourceContext(activeDeployment,
blueGreenContext.getJosdkContext());
+
+ IngressUtils.reconcileBlueGreenIngress(
+ blueGreenContext,
+ flinkResourceContext.getOperatorConfig().isManageIngress(),
+ activeDeployment,
+
flinkResourceContext.getDeployConfig(activeDeployment.getSpec()),
+ blueGreenContext.getJosdkContext());
+ }
+
// ==================== Common Utility Methods ====================
public static UpdateControl<FlinkBlueGreenDeployment>
patchStatusUpdateControl(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/ActiveStateHandler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/ActiveStateHandler.java
index 176b94fb..06deadc3 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/ActiveStateHandler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/ActiveStateHandler.java
@@ -37,6 +37,10 @@ public class ActiveStateHandler extends
AbstractBlueGreenStateHandler {
@Override
public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext
context) {
BlueGreenDeploymentType currentType = getCurrentDeploymentType();
+
+ // Reconcile ingress for the active deployment (handles spec changes)
+ deploymentService.reconcileIngressForActiveDeployment(context,
currentType);
+
return deploymentService.checkAndInitiateDeployment(context,
currentType);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
index efcf2ab6..e3f22d47 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
@@ -129,6 +130,29 @@ public class EventSourceUtils {
return new InformerEventSource<>(configuration, context);
}
+ public static InformerEventSource<?, FlinkBlueGreenDeployment>
+ getBlueGreenIngressInformerEventSource(
+ EventSourceContext<FlinkBlueGreenDeployment> context) {
+ final String labelSelector =
+ Map.of(Constants.LABEL_COMPONENT_KEY,
LABEL_COMPONENT_INGRESS).entrySet().stream()
+ .map(Object::toString)
+ .collect(Collectors.joining(","));
+
+ var ingressClass =
+ ingressInNetworkingV1(context.getClient())
+ ? Ingress.class
+ :
io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress.class;
+
+ var configuration =
+ InformerEventSourceConfiguration.from(ingressClass,
FlinkBlueGreenDeployment.class)
+ .withLabelSelector(labelSelector)
+ .withNamespacesInheritedFromController()
+ .withFollowControllerNamespacesChanges(true)
+ .build();
+
+ return new InformerEventSource<>(configuration, context);
+ }
+
public static InformerEventSource<FlinkSessionJob, FlinkDeployment>
getSessionJobInformerEventSource(EventSourceContext<FlinkDeployment> context) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
index 722270a1..74339578 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
@@ -19,8 +19,13 @@ package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.Preconditions;
@@ -38,6 +43,7 @@ import
io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress;
import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressTLS;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonDeletingOperation;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -95,7 +101,8 @@ public class IngressUtils {
}
var objectMeta = ctx.getResource().getMetadata();
if (spec.getIngress() != null) {
- HasMetadata ingress = getIngress(objectMeta, spec,
effectiveConfig, client);
+ HasMetadata ingress =
+ getIngress(objectMeta, spec.getIngress(), effectiveConfig,
client);
setOwnerReference(ctx.getResource(),
Collections.singletonList(ingress));
LOG.info("Updating ingress rules {}", ingress);
client.resource(ingress)
@@ -116,33 +123,84 @@ public class IngressUtils {
}
}
+ public static void reconcileBlueGreenIngress(
+ BlueGreenContext context,
+ boolean operatorManagedIngress,
+ FlinkDeployment targetFlinkDeployment,
+ Configuration effectiveConfig,
+ Context<FlinkBlueGreenDeployment> client) {
+ if (!operatorManagedIngress) {
+ LOG.info("Skipping rerouting of of Blue Green Ingress, not
operator managed ingress ");
+ return;
+ }
+ FlinkBlueGreenDeploymentSpec flinkBlueGreenDeploymentSpec =
+ context.getBgDeployment().getSpec();
+ var objectMeta = context.getBgDeployment().getMetadata();
+ if (flinkBlueGreenDeploymentSpec.getIngress() != null) {
+ HasMetadata ingress =
+ getIngress(
+ objectMeta,
+ flinkBlueGreenDeploymentSpec.getIngress(),
+ effectiveConfig,
+ client.getClient(),
+ targetFlinkDeployment.getMetadata().getName() +
REST_SVC_NAME_SUFFIX);
+ setOwnerReference(context.getBgDeployment(),
Collections.singletonList(ingress));
+ client.getClient()
+ .resource(ingress)
+ .inNamespace(objectMeta.getNamespace())
+ .createOr(NonDeletingOperation::update);
+ } else {
+ Optional<? extends HasMetadata> ingress;
+ if (ingressInNetworkingV1(client.getClient())) {
+ ingress =
+ client.getSecondaryResource(
+
io.fabric8.kubernetes.api.model.networking.v1.Ingress.class);
+ } else {
+ ingress = client.getSecondaryResource(Ingress.class);
+ }
+ ingress.ifPresent(i -> client.getClient().resource(i).delete());
+ }
+ }
+
private static HasMetadata getIngress(
ObjectMeta objectMeta,
- FlinkDeploymentSpec spec,
+ IngressSpec spec,
Configuration effectiveConfig,
KubernetesClient client) {
+ return getIngress(
+ objectMeta,
+ spec,
+ effectiveConfig,
+ client,
+ objectMeta.getName() + REST_SVC_NAME_SUFFIX);
+ }
+
+ private static HasMetadata getIngress(
+ ObjectMeta objectMeta,
+ IngressSpec spec,
+ Configuration effectiveConfig,
+ KubernetesClient client,
+ String serviceName) {
Map<String, String> labels =
- spec.getIngress().getLabels() == null
- ? new HashMap<>()
- : new HashMap<>(spec.getIngress().getLabels());
+ spec.getLabels() == null ? new HashMap<>() : new
HashMap<>(spec.getLabels());
labels.put(Constants.LABEL_COMPONENT_KEY, LABEL_COMPONENT_INGRESS);
if (ingressInNetworkingV1(client)) {
return new IngressBuilder()
.withNewMetadata()
.withLabels(labels)
- .withAnnotations(spec.getIngress().getAnnotations())
+ .withAnnotations(spec.getAnnotations())
.withName(objectMeta.getName())
.withNamespace(objectMeta.getNamespace())
.endMetadata()
.withNewSpec()
- .withIngressClassName(spec.getIngress().getClassName())
- .withTls(spec.getIngress().getTls())
- .withRules(getIngressRule(objectMeta, spec,
effectiveConfig))
+ .withIngressClassName(spec.getClassName())
+ .withTls(spec.getTls())
+ .withRules(getIngressRule(objectMeta, spec,
effectiveConfig, serviceName))
.endSpec()
.build();
} else {
List<IngressTLS> ingressTLS =
- Optional.ofNullable(spec.getIngress().getTls())
+ Optional.ofNullable(spec.getTls())
.map(
list ->
list.stream()
@@ -160,30 +218,31 @@ public class IngressUtils {
.orElse(Collections.emptyList());
return new
io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder()
.withNewMetadata()
- .withAnnotations(spec.getIngress().getAnnotations())
+ .withAnnotations(spec.getAnnotations())
.withLabels(labels)
.withName(objectMeta.getName())
.withNamespace(objectMeta.getNamespace())
.endMetadata()
.withNewSpec()
- .withIngressClassName(spec.getIngress().getClassName())
+ .withIngressClassName(spec.getClassName())
.withTls(ingressTLS)
- .withRules(getIngressRuleForV1beta1(objectMeta, spec,
effectiveConfig))
+ .withRules(
+ getIngressRuleForV1beta1(
+ objectMeta, spec, effectiveConfig,
serviceName))
.endSpec()
.build();
}
}
private static IngressRule getIngressRule(
- ObjectMeta objectMeta, FlinkDeploymentSpec spec, Configuration
effectiveConfig) {
- final String clusterId = objectMeta.getName();
+ ObjectMeta objectMeta,
+ IngressSpec spec,
+ Configuration effectiveConfig,
+ String serviceName) {
final int restPort = effectiveConfig.getInteger(RestOptions.PORT);
URL ingressUrl =
- getIngressUrl(
- spec.getIngress().getTemplate(),
- objectMeta.getName(),
- objectMeta.getNamespace());
+ getIngressUrl(spec.getTemplate(), objectMeta.getName(),
objectMeta.getNamespace());
IngressRuleBuilder ingressRuleBuilder = new IngressRuleBuilder();
ingressRuleBuilder.withHttp(
@@ -192,7 +251,7 @@ public class IngressUtils {
.withPathType("ImplementationSpecific")
.withNewBackend()
.withNewService()
- .withName(clusterId + REST_SVC_NAME_SUFFIX)
+ .withName(serviceName)
.withNewPort()
.withNumber(restPort)
.endPort()
@@ -219,16 +278,13 @@ public class IngressUtils {
private static
io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRule
getIngressRuleForV1beta1(
ObjectMeta objectMeta,
- FlinkDeploymentSpec spec,
- Configuration effectiveConfig) {
- final String clusterId = objectMeta.getName();
+ IngressSpec spec,
+ Configuration effectiveConfig,
+ String serviceName) {
final int restPort = effectiveConfig.getInteger(RestOptions.PORT);
URL ingressUrl =
- getIngressUrl(
- spec.getIngress().getTemplate(),
- objectMeta.getName(),
- objectMeta.getNamespace());
+ getIngressUrl(spec.getTemplate(), objectMeta.getName(),
objectMeta.getNamespace());
io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRuleBuilder
ingressRuleBuilder =
new
io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRuleBuilder();
@@ -236,7 +292,7 @@ public class IngressUtils {
new
io.fabric8.kubernetes.api.model.networking.v1beta1.HTTPIngressRuleValueBuilder()
.addNewPath()
.withNewBackend()
- .withServiceName(clusterId + REST_SVC_NAME_SUFFIX)
+ .withServiceName(serviceName)
.withServicePort(new IntOrString(restPort))
.endBackend()
.endPath()
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
index 27195908..af3ccbd6 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
@@ -26,6 +26,7 @@ import
org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import
org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDiffType;
import
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
@@ -365,6 +366,13 @@ public class BlueGreenUtils {
flinkDeployment.setSpec(spec.getTemplate().getSpec());
+ // Update Ingress template if exists to prevent path collision between
Blue and Green
+ IngressSpec ingress = flinkDeployment.getSpec().getIngress();
+ if (ingress != null) {
+ ingress.setTemplate(
+ blueGreenDeploymentType.name().toLowerCase() + "-" +
ingress.getTemplate());
+ }
+
// Deployment metadata
ObjectMeta flinkDeploymentMeta =
getDependentObjectMeta(context.getBgDeployment());
flinkDeploymentMeta.setName(childDeploymentName);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
index bc571c5d..528b6172 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
@@ -30,6 +30,7 @@ import
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpe
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import
org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
@@ -40,9 +41,11 @@ import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentS
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -90,6 +93,9 @@ public class FlinkBlueGreenDeploymentControllerTest {
private static final int MINIMUM_ABORT_GRACE_PERIOD = 1000;
private static final String TEST_CHECKPOINT_PATH = "/tmp/checkpoints";
private static final String TEST_INITIAL_SAVEPOINT_PATH =
"/tmp/savepoints";
+ private static final String BLUE_CLUSTER_ID = TEST_DEPLOYMENT_NAME +
"-blue";
+ private static final String GREEN_CLUSTER_ID = TEST_DEPLOYMENT_NAME +
"-green";
+ private static final String REST_SVC_NAME_SUFFIX = "-rest";
private final FlinkConfigManager configManager = new
FlinkConfigManager(new Configuration());
private TestingFlinkService flinkService;
private Context<FlinkBlueGreenDeployment> context;
@@ -1531,6 +1537,145 @@ public class FlinkBlueGreenDeploymentControllerTest {
var flinkDeploymentTemplateSpec =
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build();
- return new FlinkBlueGreenDeploymentSpec(configuration,
flinkDeploymentTemplateSpec);
+ return new FlinkBlueGreenDeploymentSpec(configuration, null,
flinkDeploymentTemplateSpec);
+ }
+
+ // ==================== Ingress Helper Methods ====================
+
+ private void assertIngressPointsToService(String expectedServiceName) {
+ if (IngressUtils.ingressInNetworkingV1(kubernetesClient)) {
+ Ingress ingress = getIngressV1(TEST_DEPLOYMENT_NAME,
TEST_NAMESPACE);
+ assertNotNull(ingress);
+ assertEquals(
+ expectedServiceName,
+ ingress.getSpec()
+ .getRules()
+ .get(0)
+ .getHttp()
+ .getPaths()
+ .get(0)
+ .getBackend()
+ .getService()
+ .getName());
+ } else {
+ io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress
ingressV1beta1 =
+ getIngressV1beta1(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE);
+ assertNotNull(ingressV1beta1);
+ assertEquals(
+ expectedServiceName,
+ ingressV1beta1
+ .getSpec()
+ .getRules()
+ .get(0)
+ .getHttp()
+ .getPaths()
+ .get(0)
+ .getBackend()
+ .getServiceName());
+ }
+ }
+
+ private void assertIngressDoesNotExist() {
+ if (IngressUtils.ingressInNetworkingV1(kubernetesClient)) {
+ assertNull(getIngressV1(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE));
+ } else {
+ assertNull(getIngressV1beta1(TEST_DEPLOYMENT_NAME,
TEST_NAMESPACE));
+ }
+ }
+
+ private Ingress getIngressV1(String name, String namespace) {
+ return kubernetesClient
+ .network()
+ .v1()
+ .ingresses()
+ .inNamespace(namespace)
+ .withName(name)
+ .get();
+ }
+
+ private io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress
getIngressV1beta1(
+ String name, String namespace) {
+ return kubernetesClient
+ .network()
+ .v1beta1()
+ .ingresses()
+ .inNamespace(namespace)
+ .withName(name)
+ .get();
+ }
+
+ // ==================== Ingress Rotation Tests ====================
+
+ @ParameterizedTest
+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+ public void verifyIngressSwitchesDuringTransition(FlinkVersion
flinkVersion) throws Exception {
+ // Build deployment with ingress spec
+ var blueGreenDeployment =
+ buildSessionCluster(
+ TEST_DEPLOYMENT_NAME,
+ TEST_NAMESPACE,
+ flinkVersion,
+ null,
+ UpgradeMode.STATELESS);
+
+ IngressSpec ingressSpec =
+ IngressSpec.builder()
+ .template("{{name}}.{{namespace}}.example.com")
+ .className("nginx")
+
.annotations(Map.of("nginx.ingress.kubernetes.io/rewrite-target", "/"))
+ .build();
+ blueGreenDeployment.getSpec().setIngress(ingressSpec);
+
+ // 1. Deploy Blue with ingress
+ var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment,
false, null);
+
+ // 2. Verify ingress points to Blue
+ assertIngressPointsToService(BLUE_CLUSTER_ID + REST_SVC_NAME_SUFFIX);
+
+ // 3. Trigger transition to Green
+ String customValue = UUID.randomUUID().toString();
+ simulateChangeInSpec(rs.deployment, customValue,
ALT_DELETION_DELAY_VALUE, null);
+
+ // Transition to Green
+ testTransitionToGreen(rs, customValue, null);
+
+ // 4. Verify ingress now points to Green
+ assertIngressPointsToService(GREEN_CLUSTER_ID + REST_SVC_NAME_SUFFIX);
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+ public void verifyIngressCreatedOnlyWhenConfigured(FlinkVersion
flinkVersion) throws Exception {
+ // 1. Deploy Blue without ingress spec
+ var blueGreenDeployment =
+ buildSessionCluster(
+ TEST_DEPLOYMENT_NAME,
+ TEST_NAMESPACE,
+ flinkVersion,
+ null,
+ UpgradeMode.STATELESS);
+
+ var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment,
false, null);
+
+ // 2. Verify no ingress created initially
+ assertIngressDoesNotExist();
+
+ // 3. Add ingress spec and trigger transition to Green
+ IngressSpec ingressSpec =
+ IngressSpec.builder()
+ .template("{{name}}.{{namespace}}.example.com")
+ .className("nginx")
+ .build();
+ rs.deployment.getSpec().setIngress(ingressSpec);
+ kubernetesClient.resource(rs.deployment).createOrReplace();
+
+ String customValue = UUID.randomUUID().toString();
+ simulateChangeInSpec(rs.deployment, customValue,
ALT_DELETION_DELAY_VALUE, null);
+
+ // Complete transition to Green (this reconciles ingress)
+ testTransitionToGreen(rs, customValue, null);
+
+ // 4. Verify ingress created and points to Green after transition
completes
+ assertIngressPointsToService(GREEN_CLUSTER_ID + REST_SVC_NAME_SUFFIX);
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
index 5bddcf4d..da7bedea 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
@@ -56,7 +56,8 @@ public class TestingFlinkBlueGreenDeploymentController
new StatusRecorder<>(new MetricManager<>(), (resource, status)
-> {});
flinkBlueGreenDeploymentController =
- new FlinkBlueGreenDeploymentController(contextFactory,
statusRecorder);
+ new FlinkBlueGreenDeploymentController(
+ contextFactory, configManager, statusRecorder);
}
@Override
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
index 10469677..76ff973a 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
@@ -466,6 +466,7 @@ public class FlinkBlueGreenDeploymentMetricsTest {
var bgDeploymentSpec =
new FlinkBlueGreenDeploymentSpec(
new HashMap<>(),
+ null,
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build());
deployment.setSpec(bgDeploymentSpec);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java
index ce0af71c..79add57f 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java
@@ -179,6 +179,7 @@ public class BlueGreenLifecycleMetricsTest {
var bgDeploymentSpec =
new FlinkBlueGreenDeploymentSpec(
new HashMap<>(),
+ null,
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build());
deployment.setSpec(bgDeploymentSpec);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
index a898662e..d7e73510 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
@@ -243,6 +243,7 @@ public class BlueGreenUtilsTest {
var bgDeploymentSpec =
new FlinkBlueGreenDeploymentSpec(
new HashMap<>(),
+ null,
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build());
deployment.setSpec(bgDeploymentSpec);
diff --git
a/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml
b/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml
index d31d14aa..0f8ef636 100644
---
a/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml
+++
b/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml
@@ -24,6 +24,32 @@ spec:
properties:
spec:
properties:
+ ingress:
+ properties:
+ annotations:
+ additionalProperties:
+ type: string
+ type: object
+ className:
+ type: string
+ labels:
+ additionalProperties:
+ type: string
+ type: object
+ template:
+ type: string
+ tls:
+ items:
+ properties:
+ hosts:
+ items:
+ type: string
+ type: array
+ secretName:
+ type: string
+ type: object
+ type: array
+ type: object
configuration:
additionalProperties:
type: "string"