This is an automated email from the ASF dual-hosted git repository.

gongzhongqiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7fcc2a05f [Bug] nginx ingress class bug fixed. (#3345)
7fcc2a05f is described below

commit 7fcc2a05f1076416f42889a58bca2b3a2ce9556c
Author: benjobs <[email protected]>
AuthorDate: Sat Nov 18 21:54:37 2023 +0800

    [Bug] nginx ingress class bug fixed. (#3345)
---
 .../scala/org/apache/streampark/common/util/FileUtils.scala  | 11 +++++++++++
 .../flink/kubernetes/ingress/IngressStrategy.scala           | 12 ++++++------
 2 files changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 6b95884bc..b2d6c7449 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -79,6 +79,17 @@ object FileUtils {
       s"[StreamPark] Failed to create directory within $TEMP_DIR_ATTEMPTS  
attempts (tried $baseName 0 to $baseName ${TEMP_DIR_ATTEMPTS - 1})")
   }
 
+  def mkdir(dir: File) = {
+    if (dir.exists && !dir.isDirectory) {
+      throw new IOException(s"File $dir exists and is not a directory. Unable 
to create directory.")
+    } else if (!dir.mkdirs) {
+      // Double-check that some other thread or process hasn't made
+      if (!dir.isDirectory) {
+        throw new IOException(s"Unable to create directory $dir")
+      }
+    }
+  }
+
   def getPathFromEnv(env: String): String = {
     val path = Option(System.getenv(env)).getOrElse(System.getProperty(env))
     require(
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index 6041c9bfb..e2c9c2c61 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -18,10 +18,10 @@
 package org.apache.streampark.flink.kubernetes.ingress
 
 import org.apache.streampark.common.conf.{ConfigKeys, InternalConfigHolder, 
K8sFlinkConfig}
+import org.apache.streampark.common.util.FileUtils
 
 import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder}
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
-import org.apache.commons.io.FileUtils
 import org.apache.flink.client.program.ClusterClient
 
 import java.io.File
@@ -36,26 +36,26 @@ trait IngressStrategy {
   def configureIngress(domainName: String, clusterId: String, nameSpace: 
String): Unit
 
   def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: 
String): String = {
-    val workspaceDir = new File(buildWorkspace)
-    if (!workspaceDir.exists) workspaceDir.mkdir
     if (ingressTemplates.isEmpty) null
     else {
+      val workspaceDir = new File(buildWorkspace)
+      FileUtils.mkdir(workspaceDir)
       val outputPath = buildWorkspace + "/ingress.yaml"
       val outputFile = new File(outputPath)
-      FileUtils.write(outputFile, ingressTemplates, "UTF-8")
+      FileUtils.writeFile(ingressTemplates, outputFile)
       outputPath
     }
   }
 
   def buildIngressAnnotations(clusterId: String, namespace: String): 
Map[String, String] = {
-    val annotations = Map(
+    var annotations = Map(
       "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
       "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
       "nginx.ingress.kubernetes.io/configuration-snippet" -> s"""rewrite 
^(/$clusterId)$$ $$1/ permanent; sub_filter '<base href="./">' '<base 
href="/$namespace/$clusterId/">'; sub_filter_once off;"""
     )
     val ingressClass = 
InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass)
     if (ingressClass.nonEmpty) {
-      annotations + ("kubernetes.io/ingress.class" -> ingressClass)
+      annotations += ("kubernetes.io/ingress.class" -> ingressClass)
     }
     annotations
   }

Reply via email to