This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new a703f2c6e add ingress annotation: kubernetes.io/ingress.class (#2999)
a703f2c6e is described below

commit a703f2c6ea385f2b958a92f9a8190554c677e48b
Author: blackCat <[email protected]>
AuthorDate: Fri Sep 1 21:49:45 2023 +0800

    add ingress annotation: kubernetes.io/ingress.class (#2999)
---
 .../org/apache/streampark/common/conf/K8sFlinkConfig.scala   | 12 ++++++++++++
 .../src/main/resources/application.yml                       |  3 +++
 .../flink/kubernetes/ingress/IngressStrategy.scala           |  9 +++++++--
 3 files changed, 22 insertions(+), 2 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
index 425d8fe27..7313bf201 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
@@ -56,6 +56,18 @@ object K8sFlinkConfig {
     description = "retained tracking time for SILENT state flink tasks"
   )
 
+  /**
+   * If an ingress controller is specified in the configuration, the ingress 
class
+   * kubernetes.io/ingress.class must be specified when creating the ingress, 
since there are often
+   * multiple ingress controllers in a production environment.
+   */
+  val ingressClass: InternalOption = InternalOption(
+    key = "streampark.flink-k8s.ingress.class",
+    defaultValue = "streampark",
+    classType = classOf[java.lang.String],
+    description = "Direct ingress to the ingress controller."
+  )
+
   /** kubernetes default namespace */
   val DEFAULT_KUBERNETES_NAMESPACE = "default"
 
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/application.yml
 
b/streampark-console/streampark-console-service/src/main/resources/application.yml
index d4b6a1ed5..6bf9b23dc 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/application.yml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/application.yml
@@ -121,6 +121,9 @@ streampark:
       polling-interval-sec:
         job-status: 2
         cluster-metric: 3
+    # If you need to specify an ingress controller, you can use this.
+    ingress:
+      class: nginx
 
   # packer garbage resources collection configuration
   packer-gc:
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index e348bf4b8..e97fcfc1f 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes.ingress
 
-import org.apache.streampark.common.conf.ConfigConst
+import org.apache.streampark.common.conf.{ConfigConst, InternalConfigHolder, 
K8sFlinkConfig}
 
 import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder}
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
@@ -48,11 +48,16 @@ trait IngressStrategy {
   }
 
   def buildIngressAnnotations(clusterId: String): Map[String, String] = {
-    Map(
+    val annotations = Map(
       "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
       "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
       "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" + 
clusterId + ")$ $1/ permanent;")
     )
+    val ingressClass = 
InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass)
+    if (ingressClass.nonEmpty) {
+      annotations + ("kubernetes.io/ingress.class" -> ingressClass)
+    }
+    annotations
   }
 
   def buildIngressLabels(clusterId: String): Map[String, String] = {

Reply via email to