This is an automated email from the ASF dual-hosted git repository.
wangyang0918 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 68b9186 [FLINK-26706] Introduce Ingress URL templating
68b9186 is described below
commit 68b9186049e486f4568b4d7999f90f6bae8c190b
Author: Matyas Orhidi <[email protected]>
AuthorDate: Mon Mar 21 11:30:15 2022 +0100
[FLINK-26706] Introduce Ingress URL templating
---
.../{basic-ingress.yaml => advanced-ingress.yaml} | 10 +-
examples/basic-ingress.yaml | 5 +-
.../operator/crd/spec/FlinkDeploymentSpec.java | 4 +-
.../kubernetes/operator/crd/spec/IngressSpec.java | 45 ++++++
.../operator/utils/FlinkConfigBuilder.java | 2 +-
.../kubernetes/operator/utils/IngressUtils.java | 87 +++++++++--
.../validation/DefaultDeploymentValidator.java | 23 +++
.../flink/kubernetes/operator/TestUtils.java | 3 +-
.../operator/utils/FlinkConfigBuilderTest.java | 2 +-
.../operator/utils/IngressUtilsTest.java | 166 +++++++++++++++++++++
.../validation/DeploymentValidatorTest.java | 19 +++
.../crds/flinkdeployments.flink.apache.org-v1.yml | 26 +++-
12 files changed, 360 insertions(+), 32 deletions(-)
diff --git a/examples/basic-ingress.yaml b/examples/advanced-ingress.yaml
similarity index 88%
copy from examples/basic-ingress.yaml
copy to examples/advanced-ingress.yaml
index 9620607..ca4aa11 100644
--- a/examples/basic-ingress.yaml
+++ b/examples/advanced-ingress.yaml
@@ -20,14 +20,16 @@ apiVersion: flink.apache.org/v1alpha1
kind: FlinkDeployment
metadata:
namespace: default
- name: basic-ingress
+ name: advanced-ingress
spec:
image: flink:1.14.3
flinkVersion: v1_14
- ingressDomain: flink.k8s.io
+ ingress:
+ template: "/{{namespace}}/{{name}}(/|$)(.*)"
+ className: "nginx"
+ annotations:
+ nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
-# rest.address: basic-example.flink.k8s.io
-# rest.port: "80"
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml
index 9620607..e3c83e9 100644
--- a/examples/basic-ingress.yaml
+++ b/examples/basic-ingress.yaml
@@ -24,10 +24,9 @@ metadata:
spec:
image: flink:1.14.3
flinkVersion: v1_14
- ingressDomain: flink.k8s.io
+ ingress:
+ template: "{{name}}.{{namespace}}.flink.k8s.io"
flinkConfiguration:
-# rest.address: basic-example.flink.k8s.io
-# rest.port: "80"
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index 312bb89..a743f09 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -46,8 +46,8 @@ public class FlinkDeploymentSpec {
/** Flink image version. */
private FlinkVersion flinkVersion;
- /** Ingress domain for the Flink deployment. */
- private String ingressDomain;
+ /** Ingress specs. */
+ private IngressSpec ingress;
/** Flink configuration overrides for the Flink deployment. */
private Map<String, String> flinkConfiguration;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/IngressSpec.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/IngressSpec.java
new file mode 100644
index 0000000..2be9dd8
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/IngressSpec.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import org.apache.flink.annotation.Experimental;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+/** Ingress spec. */
+@Experimental
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class IngressSpec {
+
+ /** Ingress template for the JobManager service. */
+ private String template;
+
+ /** Ingress className for the Flink deployment. */
+ private String className;
+
+ /** Ingress annotations. */
+ private Map<String, String> annotations;
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
index 6a14f18..2faa76e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -113,7 +113,7 @@ public class FlinkConfigBuilder {
public FlinkConfigBuilder applyIngressDomain() {
// Web UI
- if (spec.getIngressDomain() != null) {
+ if (spec.getIngress() != null) {
effectiveConfig.set(
REST_SERVICE_EXPOSED_TYPE,
KubernetesConfigOptions.ServiceExposedType.ClusterIP);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
index d8350dd..d99f450 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
@@ -20,6 +20,8 @@ package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.util.Preconditions;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.OwnerReference;
@@ -29,16 +31,28 @@ import
io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder
import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
+import io.fabric8.kubernetes.api.model.networking.v1.IngressRuleBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.Collections;
import java.util.List;
+import java.util.regex.Pattern;
/** Ingress utilities. */
public class IngressUtils {
+ private static final Pattern NAME_PTN =
+ Pattern.compile("\\{\\{name\\}\\}", Pattern.CASE_INSENSITIVE);
+ private static final Pattern NAMESPACE_PTN =
+ Pattern.compile("\\{\\{namespace\\}\\}", Pattern.CASE_INSENSITIVE);
+ private static final Pattern URL_PROTOCOL_REGEX =
+ Pattern.compile("^https?://", Pattern.CASE_INSENSITIVE);
+
private static final String REST_SVC_NAME_SUFFIX = "-rest";
private static final Logger LOG =
LoggerFactory.getLogger(IngressUtils.class);
@@ -47,18 +61,23 @@ public class IngressUtils {
FlinkDeployment flinkDeployment,
Configuration effectiveConfig,
KubernetesClient client) {
- if (flinkDeployment.getSpec().getIngressDomain() != null) {
- final IngressRule ingressRule = fromDeployment(flinkDeployment,
effectiveConfig);
+ if (flinkDeployment.getSpec().getIngress() != null) {
+
Ingress ingress =
new IngressBuilder()
.withNewMetadata()
+ .withAnnotations(
+
flinkDeployment.getSpec().getIngress().getAnnotations())
.withName(flinkDeployment.getMetadata().getName())
.withNamespace(flinkDeployment.getMetadata().getNamespace())
.endMetadata()
.withNewSpec()
- .withRules(ingressRule)
+ .withIngressClassName(
+
flinkDeployment.getSpec().getIngress().getClassName())
+ .withRules(getIngressRule(flinkDeployment,
effectiveConfig))
.endSpec()
.build();
+
Deployment deployment =
client.apps()
.deployments()
@@ -66,10 +85,11 @@ public class IngressUtils {
.withName(flinkDeployment.getMetadata().getName())
.get();
if (deployment == null) {
- LOG.warn("Could not find deployment {}",
flinkDeployment.getMetadata().getName());
+ LOG.error("Could not find deployment {}",
flinkDeployment.getMetadata().getName());
} else {
setOwnerReference(deployment,
Collections.singletonList(ingress));
}
+
LOG.info("Updating ingress rules {}", ingress);
client.resourceList(ingress)
.inNamespace(flinkDeployment.getMetadata().getNamespace())
@@ -77,13 +97,19 @@ public class IngressUtils {
}
}
- private static IngressRule fromDeployment(
+ private static IngressRule getIngressRule(
FlinkDeployment flinkDeployment, Configuration effectiveConfig) {
final String clusterId = flinkDeployment.getMetadata().getName();
final int restPort = effectiveConfig.getInteger(RestOptions.PORT);
- final String ingressHost = getIngressHost(flinkDeployment, clusterId);
- return new IngressRule(
- ingressHost,
+
+ URL ingressUrl =
+ getIngressUrl(
+ flinkDeployment.getSpec().getIngress().getTemplate(),
+ flinkDeployment.getMetadata().getName(),
+ flinkDeployment.getMetadata().getNamespace());
+
+ IngressRuleBuilder ingressRuleBuilder = new IngressRuleBuilder();
+ ingressRuleBuilder.withHttp(
new HTTPIngressRuleValueBuilder()
.addNewPath()
.withPathType("ImplementationSpecific")
@@ -97,17 +123,23 @@ public class IngressUtils {
.endBackend()
.endPath()
.build());
- }
- private static String getIngressHost(FlinkDeployment flinkDeployment,
String clusterId) {
- return String.format(
- "%s.%s.%s",
- clusterId,
- flinkDeployment.getMetadata().getNamespace(),
- flinkDeployment.getSpec().getIngressDomain());
+ if (!StringUtils.isBlank(ingressUrl.getHost())) {
+ ingressRuleBuilder.withHost(ingressUrl.getHost());
+ }
+
+ if (!StringUtils.isBlank(ingressUrl.getPath())) {
+ ingressRuleBuilder
+ .editHttp()
+ .editFirstPath()
+ .withPath(ingressUrl.getPath())
+ .endPath()
+ .endHttp();
+ }
+ return ingressRuleBuilder.build();
}
- public static void setOwnerReference(HasMetadata owner, List<HasMetadata>
resources) {
+ private static void setOwnerReference(HasMetadata owner, List<HasMetadata>
resources) {
final OwnerReference ownerReference =
new OwnerReferenceBuilder()
.withName(owner.getMetadata().getName())
@@ -122,4 +154,27 @@ public class IngressUtils {
resource.getMetadata()
.setOwnerReferences(Collections.singletonList(ownerReference)));
}
+
+ public static URL getIngressUrl(String ingressTemplate, String name,
String namespace) {
+ String template = addProtocol(ingressTemplate);
+ template = NAME_PTN.matcher(template).replaceAll(name);
+ template = NAMESPACE_PTN.matcher(template).replaceAll(namespace);
+ try {
+ return new URL(template);
+ } catch (MalformedURLException e) {
+ LOG.error(e.getMessage());
+ throw new ReconciliationException(
+ String.format(
+ "Unable to process the Ingress template(%s).
Error: %s",
+ ingressTemplate, e.getMessage()));
+ }
+ }
+
+ private static String addProtocol(String url) {
+ Preconditions.checkNotNull(url);
+ if (!URL_PROTOCOL_REGEX.matcher(url).find()) {
+ url = "http://" + url;
+ }
+ return url;
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index b917997..8e7d2b3 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -23,12 +23,15 @@ import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -53,6 +56,10 @@ public class DefaultDeploymentValidator implements
FlinkDeploymentValidator {
return firstPresent(
validateFlinkVersion(spec.getFlinkVersion()),
validateFlinkConfig(spec.getFlinkConfiguration()),
+ validateIngress(
+ spec.getIngress(),
+ deployment.getMetadata().getName(),
+ deployment.getMetadata().getNamespace()),
validateLogConfig(spec.getLogConfiguration()),
validateJobSpec(spec.getJob(), spec.getFlinkConfiguration()),
validateJmSpec(spec.getJobManager(),
spec.getFlinkConfiguration()),
@@ -76,6 +83,22 @@ public class DefaultDeploymentValidator implements
FlinkDeploymentValidator {
return Optional.empty();
}
+ private Optional<String> validateIngress(IngressSpec ingress, String name,
String namespace) {
+ if (ingress == null) {
+ return Optional.empty();
+ }
+ if (ingress.getTemplate() == null) {
+ return Optional.of("Ingress template must be defined");
+ }
+ try {
+ IngressUtils.getIngressUrl(ingress.getTemplate(), name, namespace);
+ } catch (ReconciliationException e) {
+ return Optional.of(e.getMessage());
+ }
+
+ return Optional.empty();
+ }
+
private Optional<String> validateFlinkConfig(Map<String, String> confMap) {
if (confMap == null) {
return Optional.empty();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index a313360..84d65f4 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -52,6 +52,7 @@ import java.util.Optional;
public class TestUtils {
public static final String TEST_NAMESPACE = "flink-operator-test";
+ public static final String TEST_DEPLOYMENT_NAME = "test-cluster";
public static final String SERVICE_ACCOUNT = "flink-operator";
public static final String FLINK_VERSION = "latest";
public static final String IMAGE = String.format("flink:%s",
FLINK_VERSION);
@@ -63,7 +64,7 @@ public class TestUtils {
deployment.setStatus(new FlinkDeploymentStatus());
deployment.setMetadata(
new ObjectMetaBuilder()
- .withName("test-cluster")
+ .withName(TEST_DEPLOYMENT_NAME)
.withNamespace(TEST_NAMESPACE)
.build());
deployment.setSpec(getTestFlinkDeploymentSpec());
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
index b739472..60a3d6a 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -73,7 +73,7 @@ public class FlinkConfigBuilderTest {
TestUtils.getTestPod("pod2 hostname", "pod2 api version", new
ArrayList<>());
flinkDeployment.getSpec().setPodTemplate(pod0);
- flinkDeployment.getSpec().setIngressDomain("test.com");
+ flinkDeployment.getSpec().getIngress().setTemplate("test.com");
flinkDeployment.getSpec().getJobManager().setPodTemplate(pod1);
flinkDeployment.getSpec().getJobManager().setReplicas(2);
flinkDeployment.getSpec().getTaskManager().setPodTemplate(pod2);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java
new file mode 100644
index 0000000..dd37eb6
--- /dev/null
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+
+import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
+import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Test;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test class for {@link IngressUtils}. */
+@EnableKubernetesMockClient(crud = true)
+public class IngressUtilsTest {
+
+ KubernetesClient client;
+
+ @Test
+ public void testIngress() {
+ FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+ Configuration config = FlinkUtils.getEffectiveConfig(appCluster, new
Configuration());
+
+ // no ingress when ingressDomain is empty
+ IngressUtils.updateIngressRules(appCluster, config, client);
+ assertNull(
+ client.network()
+ .v1()
+ .ingresses()
+ .inNamespace(appCluster.getMetadata().getNamespace())
+ .withName(appCluster.getMetadata().getName())
+ .get());
+
+ // host based routing
+ IngressSpec.IngressSpecBuilder builder = IngressSpec.builder();
+ builder.template("{{name}}.{{namespace}}.example.com");
+ appCluster.getSpec().setIngress(builder.build());
+ IngressUtils.updateIngressRules(appCluster, config, client);
+ Ingress ingress =
+ client.network()
+ .v1()
+ .ingresses()
+ .inNamespace(appCluster.getMetadata().getNamespace())
+ .withName(appCluster.getMetadata().getName())
+ .get();
+
+ List<IngressRule> rules = ingress.getSpec().getRules();
+ assertEquals(1, rules.size());
+ assertEquals(
+ appCluster.getMetadata().getName()
+ + "."
+ + appCluster.getMetadata().getNamespace()
+ + ".example.com",
+ rules.get(0).getHost());
+ assertNull(rules.get(0).getHttp().getPaths().get(0).getPath());
+
+ // path based routing
+ builder.template("/{{namespace}}/{{name}}(/|$)(.*)");
+ builder.className("nginx");
+
builder.annotations(Map.of("nginx.ingress.kubernetes.io/rewrite-target",
"/$2"));
+ appCluster.getSpec().setIngress(builder.build());
+ IngressUtils.updateIngressRules(appCluster, config, client);
+ ingress =
+ client.network()
+ .v1()
+ .ingresses()
+ .inNamespace(appCluster.getMetadata().getNamespace())
+ .withName(appCluster.getMetadata().getName())
+ .get();
+ rules = ingress.getSpec().getRules();
+ assertEquals(1, rules.size());
+ assertNull(rules.get(0).getHost());
+ assertEquals(1, rules.get(0).getHttp().getPaths().size());
+ assertEquals(
+ "/"
+ + appCluster.getMetadata().getNamespace()
+ + "/"
+ + appCluster.getMetadata().getName()
+ + "(/|$)(.*)",
+ rules.get(0).getHttp().getPaths().get(0).getPath());
+ assertEquals(
+ Map.of("nginx.ingress.kubernetes.io/rewrite-target", "/$2"),
+ ingress.getMetadata().getAnnotations());
+ assertEquals("nginx", ingress.getSpec().getIngressClassName());
+
+ // host + path based routing
+ builder.template("example.com/{{namespace}}/{{name}}(/|$)(.*)");
+ builder.className("nginx");
+ appCluster.getSpec().setIngress(builder.build());
+ IngressUtils.updateIngressRules(appCluster, config, client);
+ ingress =
+ client.network()
+ .v1()
+ .ingresses()
+ .inNamespace(appCluster.getMetadata().getNamespace())
+ .withName(appCluster.getMetadata().getName())
+ .get();
+ rules = ingress.getSpec().getRules();
+ assertEquals(1, rules.size());
+ assertEquals(1, rules.get(0).getHttp().getPaths().size());
+ assertEquals(
+ "/"
+ + appCluster.getMetadata().getNamespace()
+ + "/"
+ + appCluster.getMetadata().getName()
+ + "(/|$)(.*)",
+ rules.get(0).getHttp().getPaths().get(0).getPath());
+ assertEquals(
+ Map.of("nginx.ingress.kubernetes.io/rewrite-target", "/$2"),
+ ingress.getMetadata().getAnnotations());
+ assertEquals("nginx", ingress.getSpec().getIngressClassName());
+ }
+
+ @Test
+ public void testIngressUrl() {
+ String template = "flink.k8s.io/{{namespace}}/{{name}}";
+ URL url = IngressUtils.getIngressUrl(template, "basic-ingress",
"default");
+ assertEquals("flink.k8s.io", url.getHost());
+ assertEquals("/default/basic-ingress", url.getPath());
+
+ template = "/{{namespace}}/{{name}}";
+ url = IngressUtils.getIngressUrl(template, "basic-ingress", "default");
+ assertTrue(StringUtils.isBlank(url.getHost()));
+ assertEquals("/default/basic-ingress", url.getPath());
+
+ template = "{{name}}.{{namespace}}.flink.k8s.io";
+ url = IngressUtils.getIngressUrl(template, "basic-ingress", "default");
+
+ assertEquals("basic-ingress.default.flink.k8s.io", url.getHost());
+ assertTrue(StringUtils.isBlank(url.getPath()));
+
+ assertThrows(
+ ReconciliationException.class,
+ () -> IngressUtils.getIngressUrl("example.com:port",
"basic-ingress", "default"));
+ }
+}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 058edd5..7d885ec 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -21,6 +21,7 @@ import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
@@ -94,6 +95,24 @@ public class DeploymentValidatorTest {
"rootLogger.level = INFO")));
testError(
+ dep -> dep.getSpec().setIngress(new IngressSpec()),
+ "Ingress template must be defined");
+
+ testError(
+ dep ->
+ dep.getSpec()
+ .setIngress(
+
IngressSpec.builder().template("example.com:port").build()),
+ "Unable to process the Ingress template(example.com:port).
Error: Error at index 0 in: \"port\"");
+ testSuccess(
+ dep ->
+ dep.getSpec()
+ .setIngress(
+ IngressSpec.builder()
+
.template("example.com/{{namespace}}/{{name}}")
+ .build()));
+
+ testError(
dep -> dep.getSpec().setLogConfiguration(Map.of("random",
"config")),
"Invalid log config key");
diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 2077f93..9c83d6d 100644
--- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -31,8 +31,17 @@ spec:
- v1_15
- v1_16
type: string
- ingressDomain:
- type: string
+ ingress:
+ properties:
+ template:
+ type: string
+ className:
+ type: string
+ annotations:
+ additionalProperties:
+ type: string
+ type: object
+ type: object
flinkConfiguration:
additionalProperties:
type: string
@@ -9125,8 +9134,17 @@ spec:
- v1_15
- v1_16
type: string
- ingressDomain:
- type: string
+ ingress:
+ properties:
+ template:
+ type: string
+ className:
+ type: string
+ annotations:
+ additionalProperties:
+ type: string
+ type: object
+ type: object
flinkConfiguration:
additionalProperties:
type: string