This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f2e7b87db76 [SPARK-49731][K8S] Support K8s volume `mount.subPathExpr` 
and `hostPath` volume `type`
1f2e7b87db76 is described below

commit 1f2e7b87db76ef60eded8a6db09f6690238471ce
Author: Enrico Minack <[email protected]>
AuthorDate: Wed Sep 25 07:53:12 2024 -0700

    [SPARK-49731][K8S] Support K8s volume `mount.subPathExpr` and `hostPath` 
volume `type`
    
    ### What changes were proposed in this pull request?
    Add the following config options:
    - 
`spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.subPathExpr`
    - `spark.kubernetes.executor.volumes.hostPath.[VolumeName].options.type`
    
    ### Why are the changes needed?
    
    K8s Spec
    - https://kubernetes.io/docs/concepts/storage/volumes/#hostpath-volume-types
    - 
https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath-expanded-environment
    
    These are natural extensions of the existing options
    - 
`spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.subPath`
    - `spark.kubernetes.executor.volumes.hostPath.[VolumeName].options.path`
    
    ### Does this PR introduce _any_ user-facing change?
    Above config options.
    
    ### How was this patch tested?
    Unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #48181 from EnricoMi/k8s-volume-options.
    
    Authored-by: Enrico Minack <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../scala/org/apache/spark/deploy/k8s/Config.scala |  2 +
 .../spark/deploy/k8s/KubernetesVolumeSpec.scala    |  3 +-
 .../spark/deploy/k8s/KubernetesVolumeUtils.scala   | 18 +++++-
 .../k8s/features/MountVolumesFeatureStep.scala     |  6 +-
 .../spark/deploy/k8s/KubernetesTestConf.scala      | 11 +++-
 .../deploy/k8s/KubernetesVolumeUtilsSuite.scala    | 42 ++++++++++++-
 .../k8s/features/LocalDirsFeatureStepSuite.scala   |  3 +-
 .../features/MountVolumesFeatureStepSuite.scala    | 72 +++++++++++++++++++++-
 8 files changed, 144 insertions(+), 13 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 3a4d68c19014..9c50f8ddb00c 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -769,8 +769,10 @@ private[spark] object Config extends Logging {
   val KUBERNETES_VOLUMES_NFS_TYPE = "nfs"
   val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
   val KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY = "mount.subPath"
+  val KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY = "mount.subPathExpr"
   val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
   val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
+  val KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY = "options.type"
   val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"
   val KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY = 
"options.storageClass"
   val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
index 9dfd40a773eb..b4fe414e3cde 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s
 
 private[spark] sealed trait KubernetesVolumeSpecificConf
 
-private[spark] case class KubernetesHostPathVolumeConf(hostPath: String)
+private[spark] case class KubernetesHostPathVolumeConf(hostPath: String, 
volumeType: String)
   extends KubernetesVolumeSpecificConf
 
 private[spark] case class KubernetesPVCVolumeConf(
@@ -42,5 +42,6 @@ private[spark] case class KubernetesVolumeSpec(
     volumeName: String,
     mountPath: String,
     mountSubPath: String,
+    mountSubPathExpr: String,
     mountReadOnly: Boolean,
     volumeConf: KubernetesVolumeSpecificConf)
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
index 6463512c0114..88bb998d88b7 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
@@ -45,7 +45,9 @@ object KubernetesVolumeUtils {
       val pathKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
       val readOnlyKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
       val subPathKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
+      val subPathExprKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY"
       val labelKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_LABEL_KEY"
+      verifyMutuallyExclusiveOptionKeys(properties, subPathKey, subPathExprKey)
 
       val volumeLabelsMap = properties
         .filter(_._1.startsWith(labelKey))
@@ -57,6 +59,7 @@ object KubernetesVolumeUtils {
         volumeName = volumeName,
         mountPath = properties(pathKey),
         mountSubPath = properties.getOrElse(subPathKey, ""),
+        mountSubPathExpr = properties.getOrElse(subPathExprKey, ""),
         mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
         volumeConf = parseVolumeSpecificConf(properties,
           volumeType, volumeName, Option(volumeLabelsMap)))
@@ -87,8 +90,11 @@ object KubernetesVolumeUtils {
     volumeType match {
       case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
         val pathKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
+        val typeKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY"
         verifyOptionKey(options, pathKey, KUBERNETES_VOLUMES_HOSTPATH_TYPE)
-        KubernetesHostPathVolumeConf(options(pathKey))
+        // "" means that no checks will be performed before mounting the 
hostPath volume
+        // backward compatibility default
+        KubernetesHostPathVolumeConf(options(pathKey), 
options.getOrElse(typeKey, ""))
 
       case KUBERNETES_VOLUMES_PVC_TYPE =>
         val claimNameKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
@@ -129,6 +135,16 @@ object KubernetesVolumeUtils {
     }
   }
 
+  private def verifyMutuallyExclusiveOptionKeys(
+      options: Map[String, String],
+      keys: String*): Unit = {
+    val givenKeys = keys.filter(options.contains)
+    if (givenKeys.length > 1) {
+      throw new IllegalArgumentException("These config options are mutually 
exclusive: " +
+        s"${givenKeys.mkString(", ")}")
+    }
+  }
+
   private def verifySize(size: Option[String]): Unit = {
     size.foreach { v =>
       if (v.forall(_.isDigit) && parseLong(v) < 1024) {
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
index 5cc61c746b0e..eea4604010b2 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
@@ -65,14 +65,14 @@ private[spark] class MountVolumesFeatureStep(conf: 
KubernetesConf)
         .withMountPath(spec.mountPath)
         .withReadOnly(spec.mountReadOnly)
         .withSubPath(spec.mountSubPath)
+        .withSubPathExpr(spec.mountSubPathExpr)
         .withName(spec.volumeName)
         .build()
 
       val volumeBuilder = spec.volumeConf match {
-        case KubernetesHostPathVolumeConf(hostPath) =>
-          /* "" means that no checks will be performed before mounting the 
hostPath volume */
+        case KubernetesHostPathVolumeConf(hostPath, volumeType) =>
           new VolumeBuilder()
-            .withHostPath(new HostPathVolumeSource(hostPath, ""))
+            .withHostPath(new HostPathVolumeSource(hostPath, volumeType))
 
         case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, 
labels) =>
           val claimName = conf match {
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
index 7e0a65bcdda9..e0ddcd3d416f 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
@@ -113,9 +113,10 @@ object KubernetesTestConf {
 
     volumes.foreach { case spec =>
       val (vtype, configs) = spec.volumeConf match {
-        case KubernetesHostPathVolumeConf(path) =>
-          (KUBERNETES_VOLUMES_HOSTPATH_TYPE,
-            Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path))
+        case KubernetesHostPathVolumeConf(hostPath, volumeType) =>
+          (KUBERNETES_VOLUMES_HOSTPATH_TYPE, Map(
+            KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> hostPath,
+            KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY -> volumeType))
 
         case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, 
labels) =>
           val sconf = storageClass
@@ -145,6 +146,10 @@ object KubernetesTestConf {
         conf.set(key(vtype, spec.volumeName, 
KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY),
           spec.mountSubPath)
       }
+      if (spec.mountSubPathExpr.nonEmpty) {
+        conf.set(key(vtype, spec.volumeName, 
KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY),
+          spec.mountSubPathExpr)
+      }
       conf.set(key(vtype, spec.volumeName, 
KUBERNETES_VOLUMES_MOUNT_READONLY_KEY),
         spec.mountReadOnly.toString)
       configs.foreach { case (k, v) =>
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
index 5c103739d308..1e62db725fb6 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
@@ -30,7 +30,20 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
     assert(volumeSpec.mountPath === "/path")
     assert(volumeSpec.mountReadOnly)
     assert(volumeSpec.volumeConf.asInstanceOf[KubernetesHostPathVolumeConf] ===
-      KubernetesHostPathVolumeConf("/hostPath"))
+      KubernetesHostPathVolumeConf("/hostPath", ""))
+  }
+
+  test("Parses hostPath volume type correctly") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.hostPath.volumeName.mount.path", "/path")
+    sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath")
+    sparkConf.set("test.hostPath.volumeName.options.type", "Type")
+
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head
+    assert(volumeSpec.volumeName === "volumeName")
+    assert(volumeSpec.mountPath === "/path")
+    assert(volumeSpec.volumeConf.asInstanceOf[KubernetesHostPathVolumeConf] ===
+      KubernetesHostPathVolumeConf("/hostPath", "Type"))
   }
 
   test("Parses subPath correctly") {
@@ -43,6 +56,33 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
     assert(volumeSpec.volumeName === "volumeName")
     assert(volumeSpec.mountPath === "/path")
     assert(volumeSpec.mountSubPath === "subPath")
+    assert(volumeSpec.mountSubPathExpr === "")
+  }
+
+  test("Parses subPathExpr correctly") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
+    sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
+    sparkConf.set("test.emptyDir.volumeName.mount.subPathExpr", "subPathExpr")
+
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head
+    assert(volumeSpec.volumeName === "volumeName")
+    assert(volumeSpec.mountPath === "/path")
+    assert(volumeSpec.mountSubPath === "")
+    assert(volumeSpec.mountSubPathExpr === "subPathExpr")
+  }
+
+  test("Rejects mutually exclusive subPath and subPathExpr") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
+    sparkConf.set("test.emptyDir.volumeName.mount.subPath", "subPath")
+    sparkConf.set("test.emptyDir.volumeName.mount.subPathExpr", "subPathExpr")
+
+    val msg = intercept[IllegalArgumentException] {
+      KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
+    }.getMessage
+    assert(msg === "These config options are mutually exclusive: " +
+      "emptyDir.volumeName.mount.subPath, 
emptyDir.volumeName.mount.subPathExpr")
   }
 
   test("Parses persistentVolumeClaim volumes correctly") {
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
index eaadad163f06..3a9561051a89 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
@@ -137,8 +137,9 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite {
       "spark-local-dir-test",
       "/tmp",
       "",
+      "",
       false,
-      KubernetesHostPathVolumeConf("/hostPath/tmp")
+      KubernetesHostPathVolumeConf("/hostPath/tmp", "")
     )
     val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = 
Seq(volumeConf))
     val mountVolumeStep = new MountVolumesFeatureStep(kubernetesConf)
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
index 6a68898c5f61..c94a7a6ec26a 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
@@ -27,8 +27,9 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "testVolume",
       "/tmp",
       "",
+      "",
       false,
-      KubernetesHostPathVolumeConf("/hostPath/tmp")
+      KubernetesHostPathVolumeConf("/hostPath/tmp", "type")
     )
     val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = 
Seq(volumeConf))
     val step = new MountVolumesFeatureStep(kubernetesConf)
@@ -36,6 +37,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
 
     assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
     assert(configuredPod.pod.getSpec.getVolumes.get(0).getHostPath.getPath === 
"/hostPath/tmp")
+    assert(configuredPod.pod.getSpec.getVolumes.get(0).getHostPath.getType === 
"type")
     assert(configuredPod.container.getVolumeMounts.size() === 1)
     assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === 
"/tmp")
     assert(configuredPod.container.getVolumeMounts.get(0).getName === 
"testVolume")
@@ -47,6 +49,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "testVolume",
       "/tmp",
       "",
+      "",
       true,
       KubernetesPVCVolumeConf("pvcClaim")
     )
@@ -69,6 +72,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "testVolume",
       "/tmp",
       "",
+      "",
       true,
       KubernetesPVCVolumeConf("pvc-spark-SPARK_EXECUTOR_ID")
     )
@@ -94,6 +98,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "testVolume",
       "/tmp",
       "",
+      "",
       true,
       KubernetesPVCVolumeConf("pvc-spark-SPARK_EXECUTOR_ID", Some("fast"), 
Some("512M"))
     )
@@ -119,6 +124,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "testVolume",
       "/tmp",
       "",
+      "",
       true,
       KubernetesPVCVolumeConf("OnDemand")
     )
@@ -136,6 +142,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "testVolume",
       "/tmp",
       "",
+      "",
       true,
       KubernetesPVCVolumeConf(claimName = 
MountVolumesFeatureStep.PVC_ON_DEMAND,
         storageClass = Some("gp3"),
@@ -156,6 +163,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "testVolume",
       "/tmp",
       "",
+      "",
       true,
       KubernetesPVCVolumeConf(claimName = 
MountVolumesFeatureStep.PVC_ON_DEMAND,
         storageClass = Some("gp3"),
@@ -177,6 +185,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "checkpointVolume1",
       "/checkpoints1",
       "",
+      "",
       true,
       KubernetesPVCVolumeConf(claimName = "pvcClaim1",
         storageClass = Some("gp3"),
@@ -188,6 +197,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "checkpointVolume2",
       "/checkpoints2",
       "",
+      "",
       true,
       KubernetesPVCVolumeConf(claimName = "pvcClaim2",
         storageClass = Some("gp3"),
@@ -209,6 +219,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "testVolume",
       "/tmp",
       "",
+      "",
       true,
       KubernetesPVCVolumeConf(MountVolumesFeatureStep.PVC_ON_DEMAND)
     )
@@ -226,6 +237,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "testVolume",
       "/tmp",
       "",
+      "",
       false,
       KubernetesEmptyDirVolumeConf(Some("Memory"), Some("6G"))
     )
@@ -249,6 +261,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "testVolume",
       "/tmp",
       "",
+      "",
       false,
       KubernetesEmptyDirVolumeConf(None, None)
     )
@@ -271,6 +284,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "testVolume",
       "/tmp",
       "",
+      "",
       false,
       KubernetesNFSVolumeConf("/share/name", "nfs.example.com")
     )
@@ -293,6 +307,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "testVolume",
       "/tmp",
       "",
+      "",
       true,
       KubernetesNFSVolumeConf("/share/name", "nfs.example.com")
     )
@@ -315,13 +330,15 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "hpVolume",
       "/tmp",
       "",
+      "",
       false,
-      KubernetesHostPathVolumeConf("/hostPath/tmp")
+      KubernetesHostPathVolumeConf("/hostPath/tmp", "")
     )
     val pvcVolumeConf = KubernetesVolumeSpec(
       "checkpointVolume",
       "/checkpoints",
       "",
+      "",
       true,
       KubernetesPVCVolumeConf("pvcClaim")
     )
@@ -339,13 +356,15 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "hpVolume",
       "/data",
       "",
+      "",
       false,
-      KubernetesHostPathVolumeConf("/hostPath/tmp")
+      KubernetesHostPathVolumeConf("/hostPath/tmp", "")
     )
     val pvcVolumeConf = KubernetesVolumeSpec(
       "checkpointVolume",
       "/data",
       "",
+      "",
       true,
       KubernetesPVCVolumeConf("pvcClaim")
     )
@@ -364,6 +383,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "testVolume",
       "/tmp",
       "foo",
+      "",
       false,
       KubernetesEmptyDirVolumeConf(None, None)
     )
@@ -378,11 +398,32 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
     assert(emptyDirMount.getSubPath === "foo")
   }
 
+  test("Mounts subpathexpr on emptyDir") {
+    val volumeConf = KubernetesVolumeSpec(
+      "testVolume",
+      "/tmp",
+      "",
+      "foo",
+      false,
+      KubernetesEmptyDirVolumeConf(None, None)
+    )
+    val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = 
Seq(volumeConf))
+    val step = new MountVolumesFeatureStep(kubernetesConf)
+    val configuredPod = step.configurePod(SparkPod.initialPod())
+
+    assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
+    val emptyDirMount = configuredPod.container.getVolumeMounts.get(0)
+    assert(emptyDirMount.getMountPath === "/tmp")
+    assert(emptyDirMount.getName === "testVolume")
+    assert(emptyDirMount.getSubPathExpr === "foo")
+  }
+
   test("Mounts subpath on persistentVolumeClaims") {
     val volumeConf = KubernetesVolumeSpec(
       "testVolume",
       "/tmp",
       "bar",
+      "",
       true,
       KubernetesPVCVolumeConf("pvcClaim")
     )
@@ -400,12 +441,36 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
     assert(pvcMount.getSubPath === "bar")
   }
 
+  test("Mounts subpathexpr on persistentVolumeClaims") {
+    val volumeConf = KubernetesVolumeSpec(
+      "testVolume",
+      "/tmp",
+      "",
+      "bar",
+      true,
+      KubernetesPVCVolumeConf("pvcClaim")
+    )
+    val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = 
Seq(volumeConf))
+    val step = new MountVolumesFeatureStep(kubernetesConf)
+    val configuredPod = step.configurePod(SparkPod.initialPod())
+
+    assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
+    val pvcClaim = 
configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
+    assert(pvcClaim.getClaimName === "pvcClaim")
+    assert(configuredPod.container.getVolumeMounts.size() === 1)
+    val pvcMount = configuredPod.container.getVolumeMounts.get(0)
+    assert(pvcMount.getMountPath === "/tmp")
+    assert(pvcMount.getName === "testVolume")
+    assert(pvcMount.getSubPathExpr === "bar")
+  }
+
   test("Mounts multiple subpaths") {
     val volumeConf = KubernetesEmptyDirVolumeConf(None, None)
     val emptyDirSpec = KubernetesVolumeSpec(
       "testEmptyDir",
       "/tmp/foo",
       "foo",
+      "",
       true,
       KubernetesEmptyDirVolumeConf(None, None)
     )
@@ -413,6 +478,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       "testPVC",
       "/tmp/bar",
       "bar",
+      "",
       true,
       KubernetesEmptyDirVolumeConf(None, None)
     )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to