This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new fff1841054 [KYUUBI #6876] Support rolling 
`spark.kubernetes.file.upload.path`
fff1841054 is described below

commit fff18410544f2df00ed08b1c9f6b1c3269578b89
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Jan 15 01:27:12 2025 +0800

    [KYUUBI #6876] Support rolling `spark.kubernetes.file.upload.path`
    
    ### Why are the changes needed?
    
    The vanilla Spark neither support rolling nor expiration mechanism for 
`spark.kubernetes.file.upload.path`, if you use file system that does not 
support TTL, e.g. HDFS, additional cleanup mechanisms are needed to prevent the 
files in this directory from growing indefinitely.
    
    This PR proposes to let `spark.kubernetes.file.upload.path` support 
placeholders `{{YEAR}}`, `{{MONTH}}` and `{{DAY}}` and introduce a switch 
`kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled` to let Kyuubi server 
create the directory with 777 permission automatically before submitting Spark 
application.
    
    For example, the user can configure the below configurations in 
`kyuubi-defaults.conf` to enable monthly rolling support for 
`spark.kubernetes.file.upload.path`
    ```
    kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled=true
    
spark.kubernetes.file.upload.path=hdfs://hadoop-cluster/spark-upload-{{YEAR}}{{MONTH}}
    ```
    
    Note that: spark would create sub dir 
`s"spark-upload-${UUID.randomUUID()}"` under the 
`spark.kubernetes.file.upload.path` for each uploading, the administer still 
needs to clean up the staging directory periodically.
    
    For example:
    ```
    
hdfs://hadoop-cluster/spark-upload-202412/spark-upload-f2b71340-dc1d-4940-89e2-c5fc31614eb4
    
hdfs://hadoop-cluster/spark-upload-202412/spark-upload-173a8653-4d3e-48c0-b8ab-b7f92ae582d6
    
hdfs://hadoop-cluster/spark-upload-202501/spark-upload-3b22710f-a4a0-40bb-a3a8-16e481038a63
    ```
    
    Administer can safely delete the 
`hdfs://hadoop-cluster/spark-upload-202412` after 20250101
    
    ### How was this patch tested?
    
    New UTs are added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #6876 from pan3793/rolling-upload.
    
    Closes #6876
    
    6614bf29c [Cheng Pan] comment
    5d5cb3eb3 [Cheng Pan] docs
    343adaefb [Cheng Pan] review
    3eade8bc4 [Cheng Pan] fix
    706989778 [Cheng Pan] docs
    38953dc3f [Cheng Pan] Support rolling spark.kubernetes.file.upload.path
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/configuration/settings.md                     |  1 +
 docs/deployment/engine_on_kubernetes.md            | 27 +++++++++
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  9 +++
 .../engine/spark/SparkBatchProcessBuilder.scala    |  3 +-
 .../kyuubi/engine/spark/SparkProcessBuilder.scala  | 66 +++++++++++++++++++++-
 .../spark/SparkBatchProcessBuilderSuite.scala      | 22 ++++++++
 .../engine/spark/SparkProcessBuilderSuite.scala    | 14 ++++-
 7 files changed, 138 insertions(+), 4 deletions(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index bb81d8de53..3c17e25b81 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -362,6 +362,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.kubernetes.namespace                                          | 
default                                                                    | 
The namespace that will be used for running the kyuubi pods and find engines.   
                                                                                
                                                                                
                                                                                
                       [...]
 | kyuubi.kubernetes.namespace.allow.list                                       
                                                                    || The 
allowed kubernetes namespace list, if it is empty, there is no kubernetes 
namespace limitation.                                                           
                                                                                
                                                                                
                         [...]
 | kyuubi.kubernetes.spark.appUrlPattern                                | 
http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}} | 
The pattern to generate the spark on kubernetes application UI URL. The pattern 
should contain placeholders for the application variables. Available 
placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, 
`{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`.   
                                                     [...]
+| kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled             | false 
                                                                     | If 
enabled, Kyuubi server will try to create the 
`spark.kubernetes.file.upload.path` with permission 777 before submitting the 
Spark application.                                                              
                                                                                
                                                        [...]
 | kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval     | PT1M  
                                                                     | Kyuubi 
server use guava cache as the cleanup trigger with time-based eviction, but the 
eviction would not happened until any get/put operation happened. This option 
schedule a daemon thread evict cache periodically.                              
                                                                                
                  [...]
 | kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind              | NONE  
                                                                     | Kyuubi 
server will delete the spark driver pod after the application terminates for 
kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are 
NONE, ALL, COMPLETED and default value is None which means none of the pod will 
be deleted                                                                      
                        [...]
 | kyuubi.kubernetes.spark.forciblyRewriteDriverPodName.enabled         | false 
                                                                     | Whether 
to forcibly rewrite Spark driver pod name with 'kyuubi-<uuid>-driver'. If 
disabled, Kyuubi will try to preserve the application name while satisfying 
K8s' pod name policy, but some vendors may have stricter pod name policies, 
thus the generated name may become illegal.                                     
                             [...]
diff --git a/docs/deployment/engine_on_kubernetes.md 
b/docs/deployment/engine_on_kubernetes.md
index 7d94286bde..c7cbb25fd5 100644
--- a/docs/deployment/engine_on_kubernetes.md
+++ b/docs/deployment/engine_on_kubernetes.md
@@ -48,6 +48,33 @@ The minimum required configurations are:
 * spark.kubernetes.file.upload.path (path on S3 or HDFS)
 * spark.kubernetes.authenticate.driver.serviceAccountName ([viz 
ServiceAccount](#serviceaccount))
 
+The vanilla Spark neither support rolling nor expiration mechanism for 
`spark.kubernetes.file.upload.path`, if you use
+file system that does not support TTL, e.g. HDFS, additional cleanup 
mechanisms are needed to prevent the files in this
+directory from growing indefinitely. Since Kyuubi v1.11.0, you can configure 
`spark.kubernetes.file.upload.path` with
+placeholders `{{YEAR}}`, `{{MONTH}}` and `{{DAY}}`, and enable 
`kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled`
+to let Kyuubi server create the directory with 777 permission automatically 
before submitting Spark application.
+
+Note that, Spark would create sub dir `s"spark-upload-${UUID.randomUUID()}"` 
under the `spark.kubernetes.file.upload.path`
+for each uploading, the administer still needs to clean up the staging 
directory periodically.
+
+For example, the user can configure the below configurations in 
`kyuubi-defaults.conf` to enable monthly rolling support
+for `spark.kubernetes.file.upload.path`
+
+```
+kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled=true
+spark.kubernetes.file.upload.path=hdfs://hadoop-cluster/spark-upload-{{YEAR}}{{MONTH}}
+```
+
+and the staging files would be like
+
+```
+hdfs://hadoop-cluster/spark-upload-202412/spark-upload-f2b71340-dc1d-4940-89e2-c5fc31614eb4
+hdfs://hadoop-cluster/spark-upload-202412/spark-upload-173a8653-4d3e-48c0-b8ab-b7f92ae582d6
+hdfs://hadoop-cluster/spark-upload-202501/spark-upload-3b22710f-a4a0-40bb-a3a8-16e481038a63
+```
+
+then the administer can safely delete the 
`hdfs://hadoop-cluster/spark-upload-202412` after 20250101.
+
 ### Docker Image
 
 Spark ships a `./bin/docker-image-tool.sh` script to build and publish the 
Docker images for running Spark applications on Kubernetes.
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index a493d7c457..d27df47c02 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1334,6 +1334,15 @@ object KyuubiConf {
       .createWithDefault(
         
"http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}}";)
 
+  val KUBERNETES_SPARK_AUTO_CREATE_FILE_UPLOAD_PATH: ConfigEntry[Boolean] =
+    buildConf("kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled")
+      .doc("If enabled, Kyuubi server will try to create the " +
+        "`spark.kubernetes.file.upload.path` with permission 777 before 
submitting " +
+        "the Spark application.")
+      .version("1.11.0")
+      .booleanConf
+      .createWithDefault(false)
+
   object KubernetesCleanupDriverPodStrategy extends Enumeration {
     type KubernetesCleanupDriverPodStrategy = Value
     val NONE, ALL, COMPLETED = Value
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
index 713a34d0c8..11b4e03dde 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
@@ -56,7 +56,8 @@ class SparkBatchProcessBuilder(
     (batchKyuubiConf.getAll ++
       sparkAppNameConf() ++
       engineLogPathConf() ++
-      appendPodNameConf(batchConf)).map { case (k, v) =>
+      appendPodNameConf(batchConf) ++
+      prepareK8sFileUploadPath()).map { case (k, v) =>
       buffer ++= confKeyValue(convertConfigKey(k), v)
     }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index aacdddef32..7c862f5218 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -19,12 +19,16 @@ package org.apache.kyuubi.engine.spark
 
 import java.io.{File, FileFilter, IOException}
 import java.nio.file.Paths
+import java.time.LocalDate
+import java.time.format.DateTimeFormatter
 import java.util.Locale
 
 import scala.collection.mutable
 
 import com.google.common.annotations.VisibleForTesting
 import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.permission.FsPermission
 import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.kyuubi._
@@ -37,7 +41,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE
 import org.apache.kyuubi.ha.client.AuthTypes
 import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.util.{JavaUtils, KubernetesUtils, Validator}
+import org.apache.kyuubi.util.{JavaUtils, KubernetesUtils, KyuubiHadoopUtils, 
Validator}
 import org.apache.kyuubi.util.command.CommandLineUtils._
 
 class SparkProcessBuilder(
@@ -141,7 +145,11 @@ class SparkProcessBuilder(
       allConf = allConf ++ zkAuthKeytabFileConf(allConf)
     }
     // pass spark engine log path to spark conf
-    (allConf ++ engineLogPathConf ++ extraYarnConf(allConf) ++ 
appendPodNameConf(allConf)).foreach {
+    (allConf ++
+      engineLogPathConf ++
+      extraYarnConf(allConf) ++
+      appendPodNameConf(allConf) ++
+      prepareK8sFileUploadPath()).foreach {
       case (k, v) => buffer ++= confKeyValue(convertConfigKey(k), v)
     }
 
@@ -266,6 +274,43 @@ class SparkProcessBuilder(
     map.result().toMap
   }
 
+  def prepareK8sFileUploadPath(): Map[String, String] = {
+    kubernetesFileUploadPath() match {
+      case Some(uploadPathPattern) if isK8sClusterMode =>
+        val today = LocalDate.now()
+        val uploadPath = uploadPathPattern
+          .replace("{{YEAR}}", today.format(YEAR_FMT))
+          .replace("{{MONTH}}", today.format(MONTH_FMT))
+          .replace("{{DAY}}", today.format(DAY_FMT))
+
+        if (conf.get(KUBERNETES_SPARK_AUTO_CREATE_FILE_UPLOAD_PATH)) {
+          // Create the `uploadPath` using permission 777, otherwise, spark 
just creates the
+          // `$uploadPath/spark-upload-$uuid` using default permission 511, 
which might prevent
+          // other users from creating the staging dir under `uploadPath` 
later.
+          val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf, loadDefaults 
= false)
+          val path = new Path(uploadPath)
+          var fs: FileSystem = null
+          try {
+            fs = path.getFileSystem(hadoopConf)
+            if (!fs.exists(path)) {
+              info(s"Try creating $KUBERNETES_FILE_UPLOAD_PATH: $uploadPath")
+              fs.mkdirs(path, KUBERNETES_UPLOAD_PATH_PERMISSION)
+            }
+          } catch {
+            case ioe: IOException =>
+              warn(s"Failed to create $KUBERNETES_FILE_UPLOAD_PATH: 
$uploadPath", ioe)
+          } finally {
+            if (fs != null) {
+              Utils.tryLogNonFatalError(fs.close())
+            }
+          }
+        }
+        Map(KUBERNETES_FILE_UPLOAD_PATH -> uploadPath)
+      case _ =>
+        Map.empty
+    }
+  }
+
   def extraYarnConf(conf: Map[String, String]): Map[String, String] = {
     val map = mutable.Map.newBuilder[String, String]
     if 
(clusterManager().exists(_.toLowerCase(Locale.ROOT).startsWith("yarn"))) {
@@ -294,6 +339,11 @@ class SparkProcessBuilder(
     }
   }
 
+  def isK8sClusterMode: Boolean = {
+    clusterManager().exists(cm => 
cm.toLowerCase(Locale.ROOT).startsWith("k8s")) &&
+    deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster")
+  }
+
   def kubernetesContext(): Option[String] = {
     
conf.getOption(KUBERNETES_CONTEXT_KEY).orElse(defaultsConf.get(KUBERNETES_CONTEXT_KEY))
   }
@@ -302,6 +352,11 @@ class SparkProcessBuilder(
     
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
   }
 
+  def kubernetesFileUploadPath(): Option[String] = {
+    conf.getOption(KUBERNETES_FILE_UPLOAD_PATH)
+      .orElse(defaultsConf.get(KUBERNETES_FILE_UPLOAD_PATH))
+  }
+
   override def validateConf(): Unit = Validator.validateConf(conf)
 
   // For spark on kubernetes, spark pod using env SPARK_USER_NAME as current 
user
@@ -331,6 +386,13 @@ object SparkProcessBuilder {
   final val YARN_MAX_APP_ATTEMPTS_KEY = "spark.yarn.maxAppAttempts"
   final val INTERNAL_RESOURCE = "spark-internal"
 
+  final val KUBERNETES_FILE_UPLOAD_PATH = "spark.kubernetes.file.upload.path"
+  final val KUBERNETES_UPLOAD_PATH_PERMISSION = new 
FsPermission(Integer.parseInt("777", 8).toShort)
+
+  final val YEAR_FMT = DateTimeFormatter.ofPattern("yyyy")
+  final val MONTH_FMT = DateTimeFormatter.ofPattern("MM")
+  final val DAY_FMT = DateTimeFormatter.ofPattern("dd")
+
   /**
    * The path configs from Spark project that might upload local files:
    * - SparkSubmit
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
index e3603e24ec..f858a0f778 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.kyuubi.engine.spark
 
+import java.time.LocalDate
+import java.time.format.DateTimeFormatter
 import java.util.UUID
 
 import org.apache.kyuubi.KyuubiFunSuite
@@ -36,4 +38,24 @@ class SparkBatchProcessBuilderSuite extends KyuubiFunSuite {
       None)
     assert(builder.commands.toSeq.contains("spark.kyuubi.key=value"))
   }
+
+  test("spark.kubernetes.file.upload.path supports placeholder") {
+    val conf1 = KyuubiConf(false)
+    conf1.set("spark.master", "k8s://test:12345")
+    conf1.set("spark.submit.deployMode", "cluster")
+    conf1.set("spark.kubernetes.file.upload.path", 
"hdfs:///spark-upload-{{YEAR}}{{MONTH}}{{DAY}}")
+    val builder1 = new SparkBatchProcessBuilder(
+      "",
+      conf1,
+      UUID.randomUUID().toString,
+      "test",
+      Some("test"),
+      "test",
+      Map("kyuubi.key" -> "value"),
+      Seq.empty,
+      None)
+    val commands1 = builder1.toString.split(' ')
+    val toady = DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now())
+    
assert(commands1.contains(s"spark.kubernetes.file.upload.path=hdfs:///spark-upload-$toady"))
+  }
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index 5f3bae1249..49e4a91568 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -19,7 +19,8 @@ package org.apache.kyuubi.engine.spark
 
 import java.io.File
 import java.nio.file.{Files, Path, Paths, StandardOpenOption}
-import java.time.Duration
+import java.time.{Duration, LocalDate}
+import java.time.format.DateTimeFormatter
 import java.util.UUID
 import java.util.concurrent.{Executors, TimeUnit}
 
@@ -468,6 +469,17 @@ class SparkProcessBuilderSuite extends 
KerberizedTestHelper with MockitoSugar {
       None)
     assert(builder.commands.toSeq.contains("spark.kyuubi.key=value"))
   }
+
+  test("spark.kubernetes.file.upload.path supports placeholder") {
+    val conf1 = KyuubiConf(false)
+    conf1.set("spark.master", "k8s://test:12345")
+    conf1.set("spark.submit.deployMode", "cluster")
+    conf1.set("spark.kubernetes.file.upload.path", 
"hdfs:///spark-upload-{{YEAR}}{{MONTH}}{{DAY}}")
+    val builder1 = new SparkProcessBuilder("", true, conf1)
+    val commands1 = builder1.toString.split(' ')
+    val toady = DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now())
+    
assert(commands1.contains(s"spark.kubernetes.file.upload.path=hdfs:///spark-upload-$toady"))
+  }
 }
 
 class FakeSparkProcessBuilder(config: KyuubiConf)

Reply via email to