This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 68b9186  [FLINK-26706] Introduce Ingress URL templating
68b9186 is described below

commit 68b9186049e486f4568b4d7999f90f6bae8c190b
Author: Matyas Orhidi <[email protected]>
AuthorDate: Mon Mar 21 11:30:15 2022 +0100

    [FLINK-26706] Introduce Ingress URL templating
---
 .../{basic-ingress.yaml => advanced-ingress.yaml}  |  10 +-
 examples/basic-ingress.yaml                        |   5 +-
 .../operator/crd/spec/FlinkDeploymentSpec.java     |   4 +-
 .../kubernetes/operator/crd/spec/IngressSpec.java  |  45 ++++++
 .../operator/utils/FlinkConfigBuilder.java         |   2 +-
 .../kubernetes/operator/utils/IngressUtils.java    |  87 +++++++++--
 .../validation/DefaultDeploymentValidator.java     |  23 +++
 .../flink/kubernetes/operator/TestUtils.java       |   3 +-
 .../operator/utils/FlinkConfigBuilderTest.java     |   2 +-
 .../operator/utils/IngressUtilsTest.java           | 166 +++++++++++++++++++++
 .../validation/DeploymentValidatorTest.java        |  19 +++
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |  26 +++-
 12 files changed, 360 insertions(+), 32 deletions(-)

diff --git a/examples/basic-ingress.yaml b/examples/advanced-ingress.yaml
similarity index 88%
copy from examples/basic-ingress.yaml
copy to examples/advanced-ingress.yaml
index 9620607..ca4aa11 100644
--- a/examples/basic-ingress.yaml
+++ b/examples/advanced-ingress.yaml
@@ -20,14 +20,16 @@ apiVersion: flink.apache.org/v1alpha1
 kind: FlinkDeployment
 metadata:
   namespace: default
-  name: basic-ingress
+  name: advanced-ingress
 spec:
   image: flink:1.14.3
   flinkVersion: v1_14
-  ingressDomain: flink.k8s.io
+  ingress:
+    template: "/{{namespace}}/{{name}}(/|$)(.*)"
+    className: "nginx"
+    annotations:
+      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
   flinkConfiguration:
-#    rest.address: basic-example.flink.k8s.io
-#    rest.port: "80"
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
   jobManager:
diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml
index 9620607..e3c83e9 100644
--- a/examples/basic-ingress.yaml
+++ b/examples/basic-ingress.yaml
@@ -24,10 +24,9 @@ metadata:
 spec:
   image: flink:1.14.3
   flinkVersion: v1_14
-  ingressDomain: flink.k8s.io
+  ingress:
+    template: "{{name}}.{{namespace}}.flink.k8s.io"
   flinkConfiguration:
-#    rest.address: basic-example.flink.k8s.io
-#    rest.port: "80"
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
   jobManager:
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index 312bb89..a743f09 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -46,8 +46,8 @@ public class FlinkDeploymentSpec {
     /** Flink image version. */
     private FlinkVersion flinkVersion;
 
-    /** Ingress domain for the Flink deployment. */
-    private String ingressDomain;
+    /** Ingress specs. */
+    private IngressSpec ingress;
 
     /** Flink configuration overrides for the Flink deployment. */
     private Map<String, String> flinkConfiguration;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/IngressSpec.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/IngressSpec.java
new file mode 100644
index 0000000..2be9dd8
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/IngressSpec.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import org.apache.flink.annotation.Experimental;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+/** Ingress spec. */
+@Experimental
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class IngressSpec {
+
+    /** Ingress template for the JobManager service. */
+    private String template;
+
+    /** Ingress className for the Flink deployment. */
+    private String className;
+
+    /** Ingress annotations. */
+    private Map<String, String> annotations;
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
index 6a14f18..2faa76e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -113,7 +113,7 @@ public class FlinkConfigBuilder {
 
     public FlinkConfigBuilder applyIngressDomain() {
         // Web UI
-        if (spec.getIngressDomain() != null) {
+        if (spec.getIngress() != null) {
             effectiveConfig.set(
                     REST_SERVICE_EXPOSED_TYPE,
                     KubernetesConfigOptions.ServiceExposedType.ClusterIP);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
index d8350dd..d99f450 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
@@ -20,6 +20,8 @@ package org.apache.flink.kubernetes.operator.utils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.util.Preconditions;
 
 import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.OwnerReference;
@@ -29,16 +31,28 @@ import 
io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder
 import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
 import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
 import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
+import io.fabric8.kubernetes.api.model.networking.v1.IngressRuleBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.Collections;
 import java.util.List;
+import java.util.regex.Pattern;
 
 /** Ingress utilities. */
 public class IngressUtils {
 
+    private static final Pattern NAME_PTN =
+            Pattern.compile("\\{\\{name\\}\\}", Pattern.CASE_INSENSITIVE);
+    private static final Pattern NAMESPACE_PTN =
+            Pattern.compile("\\{\\{namespace\\}\\}", Pattern.CASE_INSENSITIVE);
+    private static final Pattern URL_PROTOCOL_REGEX =
+            Pattern.compile("^https?://", Pattern.CASE_INSENSITIVE);
+
     private static final String REST_SVC_NAME_SUFFIX = "-rest";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IngressUtils.class);
@@ -47,18 +61,23 @@ public class IngressUtils {
             FlinkDeployment flinkDeployment,
             Configuration effectiveConfig,
             KubernetesClient client) {
-        if (flinkDeployment.getSpec().getIngressDomain() != null) {
-            final IngressRule ingressRule = fromDeployment(flinkDeployment, 
effectiveConfig);
+        if (flinkDeployment.getSpec().getIngress() != null) {
+
             Ingress ingress =
                     new IngressBuilder()
                             .withNewMetadata()
+                            .withAnnotations(
+                                    
flinkDeployment.getSpec().getIngress().getAnnotations())
                             .withName(flinkDeployment.getMetadata().getName())
                             
.withNamespace(flinkDeployment.getMetadata().getNamespace())
                             .endMetadata()
                             .withNewSpec()
-                            .withRules(ingressRule)
+                            .withIngressClassName(
+                                    
flinkDeployment.getSpec().getIngress().getClassName())
+                            .withRules(getIngressRule(flinkDeployment, 
effectiveConfig))
                             .endSpec()
                             .build();
+
             Deployment deployment =
                     client.apps()
                             .deployments()
@@ -66,10 +85,11 @@ public class IngressUtils {
                             .withName(flinkDeployment.getMetadata().getName())
                             .get();
             if (deployment == null) {
-                LOG.warn("Could not find deployment {}", 
flinkDeployment.getMetadata().getName());
+                LOG.error("Could not find deployment {}", 
flinkDeployment.getMetadata().getName());
             } else {
                 setOwnerReference(deployment, 
Collections.singletonList(ingress));
             }
+
             LOG.info("Updating ingress rules {}", ingress);
             client.resourceList(ingress)
                     .inNamespace(flinkDeployment.getMetadata().getNamespace())
@@ -77,13 +97,19 @@ public class IngressUtils {
         }
     }
 
-    private static IngressRule fromDeployment(
+    private static IngressRule getIngressRule(
             FlinkDeployment flinkDeployment, Configuration effectiveConfig) {
         final String clusterId = flinkDeployment.getMetadata().getName();
         final int restPort = effectiveConfig.getInteger(RestOptions.PORT);
-        final String ingressHost = getIngressHost(flinkDeployment, clusterId);
-        return new IngressRule(
-                ingressHost,
+
+        URL ingressUrl =
+                getIngressUrl(
+                        flinkDeployment.getSpec().getIngress().getTemplate(),
+                        flinkDeployment.getMetadata().getName(),
+                        flinkDeployment.getMetadata().getNamespace());
+
+        IngressRuleBuilder ingressRuleBuilder = new IngressRuleBuilder();
+        ingressRuleBuilder.withHttp(
                 new HTTPIngressRuleValueBuilder()
                         .addNewPath()
                         .withPathType("ImplementationSpecific")
@@ -97,17 +123,23 @@ public class IngressUtils {
                         .endBackend()
                         .endPath()
                         .build());
-    }
 
-    private static String getIngressHost(FlinkDeployment flinkDeployment, 
String clusterId) {
-        return String.format(
-                "%s.%s.%s",
-                clusterId,
-                flinkDeployment.getMetadata().getNamespace(),
-                flinkDeployment.getSpec().getIngressDomain());
+        if (!StringUtils.isBlank(ingressUrl.getHost())) {
+            ingressRuleBuilder.withHost(ingressUrl.getHost());
+        }
+
+        if (!StringUtils.isBlank(ingressUrl.getPath())) {
+            ingressRuleBuilder
+                    .editHttp()
+                    .editFirstPath()
+                    .withPath(ingressUrl.getPath())
+                    .endPath()
+                    .endHttp();
+        }
+        return ingressRuleBuilder.build();
     }
 
-    public static void setOwnerReference(HasMetadata owner, List<HasMetadata> 
resources) {
+    private static void setOwnerReference(HasMetadata owner, List<HasMetadata> 
resources) {
         final OwnerReference ownerReference =
                 new OwnerReferenceBuilder()
                         .withName(owner.getMetadata().getName())
@@ -122,4 +154,27 @@ public class IngressUtils {
                         resource.getMetadata()
                                 
.setOwnerReferences(Collections.singletonList(ownerReference)));
     }
+
+    public static URL getIngressUrl(String ingressTemplate, String name, 
String namespace) {
+        String template = addProtocol(ingressTemplate);
+        template = NAME_PTN.matcher(template).replaceAll(name);
+        template = NAMESPACE_PTN.matcher(template).replaceAll(namespace);
+        try {
+            return new URL(template);
+        } catch (MalformedURLException e) {
+            LOG.error(e.getMessage());
+            throw new ReconciliationException(
+                    String.format(
+                            "Unable to process the Ingress template(%s). 
Error: %s",
+                            ingressTemplate, e.getMessage()));
+        }
+    }
+
+    private static String addProtocol(String url) {
+        Preconditions.checkNotNull(url);
+        if (!URL_PROTOCOL_REGEX.matcher(url).find()) {
+            url = "http://"; + url;
+        }
+        return url;
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index b917997..8e7d2b3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -23,12 +23,15 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
@@ -53,6 +56,10 @@ public class DefaultDeploymentValidator implements 
FlinkDeploymentValidator {
         return firstPresent(
                 validateFlinkVersion(spec.getFlinkVersion()),
                 validateFlinkConfig(spec.getFlinkConfiguration()),
+                validateIngress(
+                        spec.getIngress(),
+                        deployment.getMetadata().getName(),
+                        deployment.getMetadata().getNamespace()),
                 validateLogConfig(spec.getLogConfiguration()),
                 validateJobSpec(spec.getJob(), spec.getFlinkConfiguration()),
                 validateJmSpec(spec.getJobManager(), 
spec.getFlinkConfiguration()),
@@ -76,6 +83,22 @@ public class DefaultDeploymentValidator implements 
FlinkDeploymentValidator {
         return Optional.empty();
     }
 
+    private Optional<String> validateIngress(IngressSpec ingress, String name, 
String namespace) {
+        if (ingress == null) {
+            return Optional.empty();
+        }
+        if (ingress.getTemplate() == null) {
+            return Optional.of("Ingress template must be defined");
+        }
+        try {
+            IngressUtils.getIngressUrl(ingress.getTemplate(), name, namespace);
+        } catch (ReconciliationException e) {
+            return Optional.of(e.getMessage());
+        }
+
+        return Optional.empty();
+    }
+
     private Optional<String> validateFlinkConfig(Map<String, String> confMap) {
         if (confMap == null) {
             return Optional.empty();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index a313360..84d65f4 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -52,6 +52,7 @@ import java.util.Optional;
 public class TestUtils {
 
     public static final String TEST_NAMESPACE = "flink-operator-test";
+    public static final String TEST_DEPLOYMENT_NAME = "test-cluster";
     public static final String SERVICE_ACCOUNT = "flink-operator";
     public static final String FLINK_VERSION = "latest";
     public static final String IMAGE = String.format("flink:%s", 
FLINK_VERSION);
@@ -63,7 +64,7 @@ public class TestUtils {
         deployment.setStatus(new FlinkDeploymentStatus());
         deployment.setMetadata(
                 new ObjectMetaBuilder()
-                        .withName("test-cluster")
+                        .withName(TEST_DEPLOYMENT_NAME)
                         .withNamespace(TEST_NAMESPACE)
                         .build());
         deployment.setSpec(getTestFlinkDeploymentSpec());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
index b739472..60a3d6a 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -73,7 +73,7 @@ public class FlinkConfigBuilderTest {
                 TestUtils.getTestPod("pod2 hostname", "pod2 api version", new 
ArrayList<>());
 
         flinkDeployment.getSpec().setPodTemplate(pod0);
-        flinkDeployment.getSpec().setIngressDomain("test.com");
+        flinkDeployment.getSpec().getIngress().setTemplate("test.com");
         flinkDeployment.getSpec().getJobManager().setPodTemplate(pod1);
         flinkDeployment.getSpec().getJobManager().setReplicas(2);
         flinkDeployment.getSpec().getTaskManager().setPodTemplate(pod2);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java
new file mode 100644
index 0000000..dd37eb6
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+
+import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
+import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Test;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test class for {@link IngressUtils}. */
+@EnableKubernetesMockClient(crud = true)
+public class IngressUtilsTest {
+
+    KubernetesClient client;
+
+    @Test
+    public void testIngress() {
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        Configuration config = FlinkUtils.getEffectiveConfig(appCluster, new 
Configuration());
+
+        // no ingress when ingressDomain is empty
+        IngressUtils.updateIngressRules(appCluster, config, client);
+        assertNull(
+                client.network()
+                        .v1()
+                        .ingresses()
+                        .inNamespace(appCluster.getMetadata().getNamespace())
+                        .withName(appCluster.getMetadata().getName())
+                        .get());
+
+        // host based routing
+        IngressSpec.IngressSpecBuilder builder = IngressSpec.builder();
+        builder.template("{{name}}.{{namespace}}.example.com");
+        appCluster.getSpec().setIngress(builder.build());
+        IngressUtils.updateIngressRules(appCluster, config, client);
+        Ingress ingress =
+                client.network()
+                        .v1()
+                        .ingresses()
+                        .inNamespace(appCluster.getMetadata().getNamespace())
+                        .withName(appCluster.getMetadata().getName())
+                        .get();
+
+        List<IngressRule> rules = ingress.getSpec().getRules();
+        assertEquals(1, rules.size());
+        assertEquals(
+                appCluster.getMetadata().getName()
+                        + "."
+                        + appCluster.getMetadata().getNamespace()
+                        + ".example.com",
+                rules.get(0).getHost());
+        assertNull(rules.get(0).getHttp().getPaths().get(0).getPath());
+
+        // path based routing
+        builder.template("/{{namespace}}/{{name}}(/|$)(.*)");
+        builder.className("nginx");
+        
builder.annotations(Map.of("nginx.ingress.kubernetes.io/rewrite-target", 
"/$2"));
+        appCluster.getSpec().setIngress(builder.build());
+        IngressUtils.updateIngressRules(appCluster, config, client);
+        ingress =
+                client.network()
+                        .v1()
+                        .ingresses()
+                        .inNamespace(appCluster.getMetadata().getNamespace())
+                        .withName(appCluster.getMetadata().getName())
+                        .get();
+        rules = ingress.getSpec().getRules();
+        assertEquals(1, rules.size());
+        assertNull(rules.get(0).getHost());
+        assertEquals(1, rules.get(0).getHttp().getPaths().size());
+        assertEquals(
+                "/"
+                        + appCluster.getMetadata().getNamespace()
+                        + "/"
+                        + appCluster.getMetadata().getName()
+                        + "(/|$)(.*)",
+                rules.get(0).getHttp().getPaths().get(0).getPath());
+        assertEquals(
+                Map.of("nginx.ingress.kubernetes.io/rewrite-target", "/$2"),
+                ingress.getMetadata().getAnnotations());
+        assertEquals("nginx", ingress.getSpec().getIngressClassName());
+
+        // host + path based routing
+        builder.template("example.com/{{namespace}}/{{name}}(/|$)(.*)");
+        builder.className("nginx");
+        appCluster.getSpec().setIngress(builder.build());
+        IngressUtils.updateIngressRules(appCluster, config, client);
+        ingress =
+                client.network()
+                        .v1()
+                        .ingresses()
+                        .inNamespace(appCluster.getMetadata().getNamespace())
+                        .withName(appCluster.getMetadata().getName())
+                        .get();
+        rules = ingress.getSpec().getRules();
+        assertEquals(1, rules.size());
+        assertEquals(1, rules.get(0).getHttp().getPaths().size());
+        assertEquals(
+                "/"
+                        + appCluster.getMetadata().getNamespace()
+                        + "/"
+                        + appCluster.getMetadata().getName()
+                        + "(/|$)(.*)",
+                rules.get(0).getHttp().getPaths().get(0).getPath());
+        assertEquals(
+                Map.of("nginx.ingress.kubernetes.io/rewrite-target", "/$2"),
+                ingress.getMetadata().getAnnotations());
+        assertEquals("nginx", ingress.getSpec().getIngressClassName());
+    }
+
+    @Test
+    public void testIngressUrl() {
+        String template = "flink.k8s.io/{{namespace}}/{{name}}";
+        URL url = IngressUtils.getIngressUrl(template, "basic-ingress", 
"default");
+        assertEquals("flink.k8s.io", url.getHost());
+        assertEquals("/default/basic-ingress", url.getPath());
+
+        template = "/{{namespace}}/{{name}}";
+        url = IngressUtils.getIngressUrl(template, "basic-ingress", "default");
+        assertTrue(StringUtils.isBlank(url.getHost()));
+        assertEquals("/default/basic-ingress", url.getPath());
+
+        template = "{{name}}.{{namespace}}.flink.k8s.io";
+        url = IngressUtils.getIngressUrl(template, "basic-ingress", "default");
+
+        assertEquals("basic-ingress.default.flink.k8s.io", url.getHost());
+        assertTrue(StringUtils.isBlank(url.getPath()));
+
+        assertThrows(
+                ReconciliationException.class,
+                () -> IngressUtils.getIngressUrl("example.com:port", 
"basic-ingress", "default"));
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 058edd5..7d885ec 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
@@ -94,6 +95,24 @@ public class DeploymentValidatorTest {
                                                 "rootLogger.level = INFO")));
 
         testError(
+                dep -> dep.getSpec().setIngress(new IngressSpec()),
+                "Ingress template must be defined");
+
+        testError(
+                dep ->
+                        dep.getSpec()
+                                .setIngress(
+                                        
IngressSpec.builder().template("example.com:port").build()),
+                "Unable to process the Ingress template(example.com:port). 
Error: Error at index 0 in: \"port\"");
+        testSuccess(
+                dep ->
+                        dep.getSpec()
+                                .setIngress(
+                                        IngressSpec.builder()
+                                                
.template("example.com/{{namespace}}/{{name}}")
+                                                .build()));
+
+        testError(
                 dep -> dep.getSpec().setLogConfiguration(Map.of("random", 
"config")),
                 "Invalid log config key");
 
diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 2077f93..9c83d6d 100644
--- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -31,8 +31,17 @@ spec:
                 - v1_15
                 - v1_16
                 type: string
-              ingressDomain:
-                type: string
+              ingress:
+                properties:
+                  template:
+                    type: string
+                  className:
+                    type: string
+                  annotations:
+                    additionalProperties:
+                      type: string
+                    type: object
+                type: object
               flinkConfiguration:
                 additionalProperties:
                   type: string
@@ -9125,8 +9134,17 @@ spec:
                         - v1_15
                         - v1_16
                         type: string
-                      ingressDomain:
-                        type: string
+                      ingress:
+                        properties:
+                          template:
+                            type: string
+                          className:
+                            type: string
+                          annotations:
+                            additionalProperties:
+                              type: string
+                            type: object
+                        type: object
                       flinkConfiguration:
                         additionalProperties:
                           type: string

Reply via email to