This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new a703f2c6e add ingress annotation: kubernetes.io/ingress.class (#2999)
a703f2c6e is described below
commit a703f2c6ea385f2b958a92f9a8190554c677e48b
Author: blackCat <[email protected]>
AuthorDate: Fri Sep 1 21:49:45 2023 +0800
add ingress annotation: kubernetes.io/ingress.class (#2999)
---
.../org/apache/streampark/common/conf/K8sFlinkConfig.scala | 12 ++++++++++++
.../src/main/resources/application.yml | 3 +++
.../flink/kubernetes/ingress/IngressStrategy.scala | 9 +++++++--
3 files changed, 22 insertions(+), 2 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
index 425d8fe27..7313bf201 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
@@ -56,6 +56,18 @@ object K8sFlinkConfig {
description = "retained tracking time for SILENT state flink tasks"
)
+ /**
+ * If an ingress controller is specified in the configuration, the ingress
class
+ * kubernetes.io/ingress.class must be specified when creating the ingress,
since there are often
+ * multiple ingress controllers in a production environment.
+ */
+ val ingressClass: InternalOption = InternalOption(
+ key = "streampark.flink-k8s.ingress.class",
+ defaultValue = "streampark",
+ classType = classOf[java.lang.String],
+ description = "Direct ingress to the ingress controller."
+ )
+
/** kubernetes default namespace */
val DEFAULT_KUBERNETES_NAMESPACE = "default"
diff --git
a/streampark-console/streampark-console-service/src/main/resources/application.yml
b/streampark-console/streampark-console-service/src/main/resources/application.yml
index d4b6a1ed5..6bf9b23dc 100644
---
a/streampark-console/streampark-console-service/src/main/resources/application.yml
+++
b/streampark-console/streampark-console-service/src/main/resources/application.yml
@@ -121,6 +121,9 @@ streampark:
polling-interval-sec:
job-status: 2
cluster-metric: 3
+ # If you need to specify an ingress controller, you can use this.
+ ingress:
+ class: nginx
# packer garbage resources collection configuration
packer-gc:
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index e348bf4b8..e97fcfc1f 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.kubernetes.ingress
-import org.apache.streampark.common.conf.ConfigConst
+import org.apache.streampark.common.conf.{ConfigConst, InternalConfigHolder,
K8sFlinkConfig}
import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder}
import io.fabric8.kubernetes.client.DefaultKubernetesClient
@@ -48,11 +48,16 @@ trait IngressStrategy {
}
def buildIngressAnnotations(clusterId: String): Map[String, String] = {
- Map(
+ val annotations = Map(
"nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
"nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
"nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" +
clusterId + ")$ $1/ permanent;")
)
+ val ingressClass =
InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass)
+ if (ingressClass.nonEmpty) {
+ annotations + ("kubernetes.io/ingress.class" -> ingressClass)
+ }
+ annotations
}
def buildIngressLabels(clusterId: String): Map[String, String] = {