This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch ingress in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit e2e832741351558d007b65979f8bbe7564d0827d Author: benjobs <[email protected]> AuthorDate: Wed Nov 15 22:40:05 2023 +0800 [Bug] nginx ingress class bug fixed. --- .../scala/org/apache/streampark/common/util/FileUtils.scala | 11 +++++++++++ .../flink/kubernetes/ingress/IngressStrategy.scala | 12 ++++++------ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala index 6b95884bc..b2d6c7449 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala @@ -79,6 +79,17 @@ object FileUtils { s"[StreamPark] Failed to create directory within $TEMP_DIR_ATTEMPTS attempts (tried $baseName 0 to $baseName ${TEMP_DIR_ATTEMPTS - 1})") } + def mkdir(dir: File) = { + if (dir.exists && !dir.isDirectory) { + throw new IOException(s"File $dir exists and is not a directory. Unable to create directory.") + } else if (!dir.mkdirs) { + // Double-check that some other thread or process hasn't made + if (!dir.isDirectory) { + throw new IOException(s"Unable to create directory $dir") + } + } + } + def getPathFromEnv(env: String): String = { val path = Option(System.getenv(env)).getOrElse(System.getProperty(env)) require( diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala index 6041c9bfb..e2c9c2c61 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala @@ -18,10 +18,10 @@ package org.apache.streampark.flink.kubernetes.ingress import org.apache.streampark.common.conf.{ConfigKeys, InternalConfigHolder, K8sFlinkConfig} +import org.apache.streampark.common.util.FileUtils import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder} import io.fabric8.kubernetes.client.DefaultKubernetesClient -import org.apache.commons.io.FileUtils import org.apache.flink.client.program.ClusterClient import java.io.File @@ -36,26 +36,26 @@ trait IngressStrategy { def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: String): String = { - val workspaceDir = new File(buildWorkspace) - if (!workspaceDir.exists) workspaceDir.mkdir if (ingressTemplates.isEmpty) null else { + val workspaceDir = new File(buildWorkspace) + FileUtils.mkdir(workspaceDir) val outputPath = buildWorkspace + "/ingress.yaml" val outputFile = new File(outputPath) - FileUtils.write(outputFile, ingressTemplates, "UTF-8") + FileUtils.writeFile(ingressTemplates, outputFile) outputPath } } def buildIngressAnnotations(clusterId: String, namespace: String): Map[String, String] = { - val annotations = Map( + var annotations = Map( "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2", "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m", "nginx.ingress.kubernetes.io/configuration-snippet" -> s"""rewrite ^(/$clusterId)$$ $$1/ permanent; sub_filter '<base href="./">' '<base href="/$namespace/$clusterId/">'; sub_filter_once off;""" ) val ingressClass = InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass) if (ingressClass.nonEmpty) { - annotations + ("kubernetes.io/ingress.class" -> ingressClass) + annotations += ("kubernetes.io/ingress.class" -> ingressClass) } annotations }
