This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c49fb2a [FLINK-37193] Feature flag if operator should manage ingress
9c49fb2a is described below

commit 9c49fb2a107e536143d3d9c712dfcaca15fa82c6
Author: Attila Mészáros <[email protected]>
AuthorDate: Mon Aug 25 15:24:15 2025 +0200

    [FLINK-37193] Feature flag if operator should manage ingress
---
 .../kubernetes_operator_config_configuration.html  |  6 ++
 .../generated/system_advanced_section.html         |  6 ++
 .../flink/kubernetes/operator/FlinkOperator.java   |  3 +-
 .../config/FlinkOperatorConfiguration.java         |  7 ++-
 .../config/KubernetesOperatorConfigOptions.java    |  8 +++
 .../controller/FlinkDeploymentController.java      | 10 ++-
 .../deployment/ApplicationReconciler.java          |  3 +-
 .../reconciler/deployment/SessionReconciler.java   |  3 +-
 .../kubernetes/operator/utils/IngressUtils.java    | 20 +++++-
 .../TestingFlinkDeploymentController.java          |  3 +-
 .../operator/utils/IngressUtilsTest.java           | 73 ++++++++++++++++++----
 11 files changed, 122 insertions(+), 20 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 5667d888..db57a553 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -170,6 +170,12 @@
             <td>Integer</td>
             <td>The port the health probe will use to expose the status.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.ingress.manage</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Feature flag if operator will manage the Ingress resource. If 
false, no InformerEventSource will be registered for Ingress, and Ingress won't 
be created.</td>
+        </tr>
         <tr>
             
<td><h5>kubernetes.operator.jm-deployment-recovery.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/docs/layouts/shortcodes/generated/system_advanced_section.html 
b/docs/layouts/shortcodes/generated/system_advanced_section.html
index 7752c15d..1c9bc3bd 100644
--- a/docs/layouts/shortcodes/generated/system_advanced_section.html
+++ b/docs/layouts/shortcodes/generated/system_advanced_section.html
@@ -68,6 +68,12 @@
             <td>Integer</td>
             <td>The port the health probe will use to expose the status.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.ingress.manage</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Feature flag if operator will manage the Ingress resource. If 
false, no InformerEventSource will be registered for Ingress, and Ingress won't 
be created.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.label.selector</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index e2cca039..3e21ab45 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -199,7 +199,8 @@ public class FlinkOperator {
                         observerFactory,
                         statusRecorder,
                         eventRecorder,
-                        canaryResourceManager);
+                        canaryResourceManager,
+                        configManager);
         registeredControllers.add(operator.register(controller, 
this::overrideControllerConfigs));
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index f65385ec..97068722 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -79,6 +79,7 @@ public class FlinkOperatorConfiguration {
     Duration slowRequestThreshold;
     int reportedExceptionEventsMaxCount;
     int reportedExceptionEventsMaxStackTraceLength;
+    boolean manageIngress;
 
     public static FlinkOperatorConfiguration fromConfiguration(Configuration 
operatorConfig) {
         Duration reconcileInterval =
@@ -203,6 +204,9 @@ public class FlinkOperatorConfiguration {
                 operatorConfig.get(
                         
KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES);
 
+        boolean manageIngress =
+                
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_MANAGE_INGRESS);
+
         return new FlinkOperatorConfiguration(
                 reconcileInterval,
                 reconcilerMaxParallelism,
@@ -234,7 +238,8 @@ public class FlinkOperatorConfiguration {
                 snapshotResourcesEnabled,
                 slowRequestThreshold,
                 reportedExceptionEventsMaxCount,
-                reportedExceptionEventsMaxStackTraceLength);
+                reportedExceptionEventsMaxStackTraceLength,
+                manageIngress);
     }
 
     private static GenericRetry getRetryConfig(Configuration conf) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 38108bd1..4f755372 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -679,4 +679,12 @@ public class KubernetesOperatorConfigOptions {
                     .defaultValue(10)
                     .withDescription(
                             "Maximum number of exception-related Kubernetes 
events emitted per reconciliation cycle.");
+
+    @Documentation.Section(SECTION_ADVANCED)
+    public static final ConfigOption<Boolean> OPERATOR_MANAGE_INGRESS =
+            operatorConfig("ingress.manage")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Feature flag if operator will manage the Ingress 
resource. If false, no InformerEventSource will be registered for Ingress, and 
Ingress won't be created.");
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 165c313d..32a3109d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
 import 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException;
@@ -69,6 +70,7 @@ public class FlinkDeploymentController
     private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder;
     private final EventRecorder eventRecorder;
     private final CanaryResourceManager<FlinkDeployment> canaryResourceManager;
+    private final FlinkConfigManager flinkConfigManager;
 
     public FlinkDeploymentController(
             Set<FlinkResourceValidator> validators,
@@ -77,7 +79,8 @@ public class FlinkDeploymentController
             FlinkDeploymentObserverFactory observerFactory,
             StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder,
             EventRecorder eventRecorder,
-            CanaryResourceManager<FlinkDeployment> canaryResourceManager) {
+            CanaryResourceManager<FlinkDeployment> canaryResourceManager,
+            FlinkConfigManager flinkConfigManager) {
         this.validators = validators;
         this.ctxFactory = ctxFactory;
         this.reconcilerFactory = reconcilerFactory;
@@ -85,6 +88,7 @@ public class FlinkDeploymentController
         this.statusRecorder = statusRecorder;
         this.eventRecorder = eventRecorder;
         this.canaryResourceManager = canaryResourceManager;
+        this.flinkConfigManager = flinkConfigManager;
     }
 
     @Override
@@ -184,7 +188,9 @@ public class FlinkDeploymentController
         List<EventSource<?, FlinkDeployment>> eventSources = new ArrayList<>();
         
eventSources.add(EventSourceUtils.getSessionJobInformerEventSource(context));
         
eventSources.add(EventSourceUtils.getDeploymentInformerEventSource(context));
-        
eventSources.add(EventSourceUtils.getIngressInformerEventSource(context));
+        if (flinkConfigManager.getOperatorConfiguration().isManageIngress()) {
+            
eventSources.add(EventSourceUtils.getIngressInformerEventSource(context));
+        }
         if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) {
             eventSources.add(
                     
EventSourceUtils.getStateSnapshotForFlinkResourceInformerEventSource(context));
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index f2a1b71e..180fb128 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -190,7 +190,8 @@ public class ApplicationReconciler
         
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
         
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
 
-        IngressUtils.reconcileIngress(ctx, spec, deployConfig, 
ctx.getKubernetesClient());
+        IngressUtils.reconcileIngress(
+                ctx, spec, deployConfig, ctx.getKubernetesClient(), 
eventRecorder);
     }
 
     private void setJobIdIfNecessary(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 139a9993..4c998fec 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -106,7 +106,8 @@ public class SessionReconciler
         setOwnerReference(cr, deployConfig);
         ctx.getFlinkService().submitSessionCluster(deployConfig);
         
cr.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
-        IngressUtils.reconcileIngress(ctx, spec, deployConfig, 
ctx.getKubernetesClient());
+        IngressUtils.reconcileIngress(
+                ctx, spec, deployConfig, ctx.getKubernetesClient(), 
eventRecorder);
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
index 89036d05..722270a1 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
@@ -38,6 +38,7 @@ import 
io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress;
 import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressTLS;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.dsl.NonDeletingOperation;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +57,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.kubernetes.operator.utils.EventSourceUtils.LABEL_COMPONENT_INGRESS;
 
 /** Ingress utilities. */
+@Slf4j
 public class IngressUtils {
 
     private static final Pattern NAME_PTN =
@@ -68,13 +70,29 @@ public class IngressUtils {
     private static final String REST_SVC_NAME_SUFFIX = "-rest";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IngressUtils.class);
+    public static final String INGRESS_MANAGEMENT_OFF_BUT_SPEC_SET =
+            "Ingress management is turned off but ingress set in spec";
+    public static final String INGRESS_MANAGEMENT = "IngressManagement";
 
     public static void reconcileIngress(
             FlinkResourceContext<?> ctx,
             FlinkDeploymentSpec spec,
             Configuration effectiveConfig,
-            KubernetesClient client) {
+            KubernetesClient client,
+            EventRecorder eventRecorder) {
+        if (!ctx.getOperatorConfig().isManageIngress()) {
+            if (spec.getIngress() != null) {
+                eventRecorder.triggerEvent(
+                        ctx.getResource(),
+                        EventRecorder.Type.Warning,
+                        INGRESS_MANAGEMENT,
+                        INGRESS_MANAGEMENT_OFF_BUT_SPEC_SET,
+                        EventRecorder.Component.Operator,
+                        client);
+            }
 
+            return;
+        }
         var objectMeta = ctx.getResource().getMetadata();
         if (spec.getIngress() != null) {
             HasMetadata ingress = getIngress(objectMeta, spec, 
effectiveConfig, client);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 5eca9394..af04e069 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -110,7 +110,8 @@ public class TestingFlinkDeploymentController
                         new FlinkDeploymentObserverFactory(eventRecorder),
                         statusRecorder,
                         eventRecorder,
-                        canaryResourceManager);
+                        canaryResourceManager,
+                        new 
FlinkConfigManager(Configuration.fromMap(Map.of())));
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java
index 78b6c3a1..08124ced 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 
+import io.fabric8.kubernetes.api.model.Event;
 import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
 import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
 import io.fabric8.kubernetes.api.model.networking.v1.IngressTLS;
@@ -38,9 +39,11 @@ import org.junit.jupiter.api.Test;
 
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_MANAGE_INGRESS;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -57,7 +60,20 @@ class IngressUtilsTest {
 
     private FlinkResourceContext<?> createResourceContext(FlinkDeployment 
appCluster) {
         testingJosdkContext = new TestingJosdkContext<>(client);
-        return new FlinkDeploymentContext(appCluster, testingJosdkContext, 
null, null, null, null);
+        return new FlinkDeploymentContext(
+                appCluster,
+                testingJosdkContext,
+                null,
+                new FlinkConfigManager(Configuration.fromMap(new HashMap<>())),
+                null,
+                null);
+    }
+
+    private FlinkResourceContext<?> createResourceContext(
+            FlinkDeployment appCluster, FlinkConfigManager configManager) {
+        testingJosdkContext = new TestingJosdkContext<>(client);
+        return new FlinkDeploymentContext(
+                appCluster, testingJosdkContext, null, configManager, null, 
null);
     }
 
     @Test
@@ -69,7 +85,7 @@ class IngressUtilsTest {
 
         // no ingress when ingressDomain is empty
         IngressUtils.reconcileIngress(
-                createResourceContext(appCluster), appCluster.getSpec(), 
config, client);
+                createResourceContext(appCluster), appCluster.getSpec(), 
config, client, null);
         if (IngressUtils.ingressInNetworkingV1(client)) {
             assertNull(
                     client.network()
@@ -93,7 +109,7 @@ class IngressUtilsTest {
         builder.template("{{name}}.{{namespace}}.example.com");
         appCluster.getSpec().setIngress(builder.build());
         IngressUtils.reconcileIngress(
-                createResourceContext(appCluster), appCluster.getSpec(), 
config, client);
+                createResourceContext(appCluster), appCluster.getSpec(), 
config, client, null);
         Ingress ingress = null;
         io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress 
ingressV1beta1 = null;
         if (IngressUtils.ingressInNetworkingV1(client)) {
@@ -142,7 +158,7 @@ class IngressUtilsTest {
         
builder.annotations(Map.of("nginx.ingress.kubernetes.io/rewrite-target", 
"/$2"));
         appCluster.getSpec().setIngress(builder.build());
         IngressUtils.reconcileIngress(
-                createResourceContext(appCluster), appCluster.getSpec(), 
config, client);
+                createResourceContext(appCluster), appCluster.getSpec(), 
config, client, null);
         if (IngressUtils.ingressInNetworkingV1(client)) {
             ingress =
                     client.network()
@@ -199,7 +215,7 @@ class IngressUtilsTest {
         builder.className("nginx");
         appCluster.getSpec().setIngress(builder.build());
         IngressUtils.reconcileIngress(
-                createResourceContext(appCluster), appCluster.getSpec(), 
config, client);
+                createResourceContext(appCluster), appCluster.getSpec(), 
config, client, null);
         if (IngressUtils.ingressInNetworkingV1(client)) {
             ingress =
                     client.network()
@@ -283,7 +299,7 @@ class IngressUtilsTest {
         builder.tls(new ArrayList<>());
         appCluster.getSpec().setIngress(builder.build());
         IngressUtils.reconcileIngress(
-                createResourceContext(appCluster), appCluster.getSpec(), 
config, client);
+                createResourceContext(appCluster), appCluster.getSpec(), 
config, client, null);
         Ingress ingress = null;
         io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress 
ingressV1beta1 = null;
         if (IngressUtils.ingressInNetworkingV1(client)) {
@@ -320,7 +336,7 @@ class IngressUtilsTest {
         builder.tls(List.of(ingressTlsSpecSecretOnly));
         appCluster.getSpec().setIngress(builder.build());
         IngressUtils.reconcileIngress(
-                createResourceContext(appCluster), appCluster.getSpec(), 
config, client);
+                createResourceContext(appCluster), appCluster.getSpec(), 
config, client, null);
         if (IngressUtils.ingressInNetworkingV1(client)) {
             ingress =
                     client.network()
@@ -358,7 +374,7 @@ class IngressUtilsTest {
         builder.tls(List.of(ingressTlsSpecHostsOnly));
         appCluster.getSpec().setIngress(builder.build());
         IngressUtils.reconcileIngress(
-                createResourceContext(appCluster), appCluster.getSpec(), 
config, client);
+                createResourceContext(appCluster), appCluster.getSpec(), 
config, client, null);
         if (IngressUtils.ingressInNetworkingV1(client)) {
             ingress =
                     client.network()
@@ -396,7 +412,7 @@ class IngressUtilsTest {
         builder.tls(List.of(ingressTlsSpecSingleTLSWithHost));
         appCluster.getSpec().setIngress(builder.build());
         IngressUtils.reconcileIngress(
-                createResourceContext(appCluster), appCluster.getSpec(), 
config, client);
+                createResourceContext(appCluster), appCluster.getSpec(), 
config, client, null);
         if (IngressUtils.ingressInNetworkingV1(client)) {
             ingress =
                     client.network()
@@ -438,7 +454,7 @@ class IngressUtilsTest {
         builder.tls(List.of(ingressTlsSpecSingleTLSWithHosts));
         appCluster.getSpec().setIngress(builder.build());
         IngressUtils.reconcileIngress(
-                createResourceContext(appCluster), appCluster.getSpec(), 
config, client);
+                createResourceContext(appCluster), appCluster.getSpec(), 
config, client, null);
         if (IngressUtils.ingressInNetworkingV1(client)) {
             ingress =
                     client.network()
@@ -485,7 +501,7 @@ class IngressUtilsTest {
                 List.of(ingressTlsSpecMultipleTLSWithHosts1, 
ingressTlsSpecMultipleTLSWithHosts2));
         appCluster.getSpec().setIngress(builder.build());
         IngressUtils.reconcileIngress(
-                createResourceContext(appCluster), appCluster.getSpec(), 
config, client);
+                createResourceContext(appCluster), appCluster.getSpec(), 
config, client, null);
         if (IngressUtils.ingressInNetworkingV1(client)) {
             ingress =
                     client.network()
@@ -549,7 +565,39 @@ class IngressUtilsTest {
                         
io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress.class,
                         List.of(ingress)));
 
-        IngressUtils.reconcileIngress(context, appCluster.getSpec(), null, 
client);
+        IngressUtils.reconcileIngress(context, appCluster.getSpec(), null, 
client, null);
+
+        var ingressV1beta1 =
+                client.network()
+                        .v1beta1()
+                        .ingresses()
+                        .inNamespace(appCluster.getMetadata().getNamespace())
+                        .withName(appCluster.getMetadata().getName())
+                        .get();
+        assertThat(ingressV1beta1).isNull();
+    }
+
+    @Test
+    void skipIngressReconciliationIfFeatureFlagOff() {
+        List<Event> events = new ArrayList<>();
+        EventRecorder eventRecorder =
+                new EventRecorder((a, event) -> events.add(event), (a, b) -> 
{});
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        FlinkConfigManager manager =
+                new FlinkConfigManager(
+                        
Configuration.fromMap(Map.of(OPERATOR_MANAGE_INGRESS.key(), "false")));
+        var context = createResourceContext(appCluster, manager);
+        context.getOperatorConfig();
+        Configuration config =
+                new FlinkConfigManager(new Configuration())
+                        .getDeployConfig(appCluster.getMetadata(), 
appCluster.getSpec());
+
+        IngressSpec.IngressSpecBuilder builder = IngressSpec.builder();
+        builder.template("{{name}}.{{namespace}}.example.com");
+        builder.tls(new ArrayList<>());
+        appCluster.getSpec().setIngress(builder.build());
+
+        IngressUtils.reconcileIngress(context, appCluster.getSpec(), config, 
client, eventRecorder);
 
         var ingressV1beta1 =
                 client.network()
@@ -559,5 +607,6 @@ class IngressUtilsTest {
                         .withName(appCluster.getMetadata().getName())
                         .get();
         assertThat(ingressV1beta1).isNull();
+        assertThat(events).hasSize(1);
     }
 }

Reply via email to