This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1f2e7b87db76 [SPARK-49731][K8S] Support K8s volume `mount.subPathExpr`
and `hostPath` volume `type`
1f2e7b87db76 is described below
commit 1f2e7b87db76ef60eded8a6db09f6690238471ce
Author: Enrico Minack <[email protected]>
AuthorDate: Wed Sep 25 07:53:12 2024 -0700
[SPARK-49731][K8S] Support K8s volume `mount.subPathExpr` and `hostPath`
volume `type`
### What changes were proposed in this pull request?
Add the following config options:
-
`spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.subPathExpr`
- `spark.kubernetes.executor.volumes.hostPath.[VolumeName].options.type`
### Why are the changes needed?
K8s Spec
- https://kubernetes.io/docs/concepts/storage/volumes/#hostpath-volume-types
-
https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath-expanded-environment
These are natural extensions of the existing options
-
`spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.subPath`
- `spark.kubernetes.executor.volumes.hostPath.[VolumeName].options.path`
### Does this PR introduce _any_ user-facing change?
Above config options.
### How was this patch tested?
Unit tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48181 from EnricoMi/k8s-volume-options.
Authored-by: Enrico Minack <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../scala/org/apache/spark/deploy/k8s/Config.scala | 2 +
.../spark/deploy/k8s/KubernetesVolumeSpec.scala | 3 +-
.../spark/deploy/k8s/KubernetesVolumeUtils.scala | 18 +++++-
.../k8s/features/MountVolumesFeatureStep.scala | 6 +-
.../spark/deploy/k8s/KubernetesTestConf.scala | 11 +++-
.../deploy/k8s/KubernetesVolumeUtilsSuite.scala | 42 ++++++++++++-
.../k8s/features/LocalDirsFeatureStepSuite.scala | 3 +-
.../features/MountVolumesFeatureStepSuite.scala | 72 +++++++++++++++++++++-
8 files changed, 144 insertions(+), 13 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 3a4d68c19014..9c50f8ddb00c 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -769,8 +769,10 @@ private[spark] object Config extends Logging {
val KUBERNETES_VOLUMES_NFS_TYPE = "nfs"
val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
val KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY = "mount.subPath"
+ val KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY = "mount.subPathExpr"
val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
+ val KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY = "options.type"
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY =
"options.storageClass"
val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
index 9dfd40a773eb..b4fe414e3cde 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s
private[spark] sealed trait KubernetesVolumeSpecificConf
-private[spark] case class KubernetesHostPathVolumeConf(hostPath: String)
+private[spark] case class KubernetesHostPathVolumeConf(hostPath: String,
volumeType: String)
extends KubernetesVolumeSpecificConf
private[spark] case class KubernetesPVCVolumeConf(
@@ -42,5 +42,6 @@ private[spark] case class KubernetesVolumeSpec(
volumeName: String,
mountPath: String,
mountSubPath: String,
+ mountSubPathExpr: String,
mountReadOnly: Boolean,
volumeConf: KubernetesVolumeSpecificConf)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
index 6463512c0114..88bb998d88b7 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
@@ -45,7 +45,9 @@ object KubernetesVolumeUtils {
val pathKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
val readOnlyKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
val subPathKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
+ val subPathExprKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY"
val labelKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_LABEL_KEY"
+ verifyMutuallyExclusiveOptionKeys(properties, subPathKey, subPathExprKey)
val volumeLabelsMap = properties
.filter(_._1.startsWith(labelKey))
@@ -57,6 +59,7 @@ object KubernetesVolumeUtils {
volumeName = volumeName,
mountPath = properties(pathKey),
mountSubPath = properties.getOrElse(subPathKey, ""),
+ mountSubPathExpr = properties.getOrElse(subPathExprKey, ""),
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
volumeConf = parseVolumeSpecificConf(properties,
volumeType, volumeName, Option(volumeLabelsMap)))
@@ -87,8 +90,11 @@ object KubernetesVolumeUtils {
volumeType match {
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
val pathKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
+ val typeKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY"
verifyOptionKey(options, pathKey, KUBERNETES_VOLUMES_HOSTPATH_TYPE)
- KubernetesHostPathVolumeConf(options(pathKey))
+ // "" means that no checks will be performed before mounting the
hostPath volume
+ // backward compatibility default
+ KubernetesHostPathVolumeConf(options(pathKey),
options.getOrElse(typeKey, ""))
case KUBERNETES_VOLUMES_PVC_TYPE =>
val claimNameKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
@@ -129,6 +135,16 @@ object KubernetesVolumeUtils {
}
}
+ private def verifyMutuallyExclusiveOptionKeys(
+ options: Map[String, String],
+ keys: String*): Unit = {
+ val givenKeys = keys.filter(options.contains)
+ if (givenKeys.length > 1) {
+ throw new IllegalArgumentException("These config options are mutually
exclusive: " +
+ s"${givenKeys.mkString(", ")}")
+ }
+ }
+
private def verifySize(size: Option[String]): Unit = {
size.foreach { v =>
if (v.forall(_.isDigit) && parseLong(v) < 1024) {
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
index 5cc61c746b0e..eea4604010b2 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
@@ -65,14 +65,14 @@ private[spark] class MountVolumesFeatureStep(conf:
KubernetesConf)
.withMountPath(spec.mountPath)
.withReadOnly(spec.mountReadOnly)
.withSubPath(spec.mountSubPath)
+ .withSubPathExpr(spec.mountSubPathExpr)
.withName(spec.volumeName)
.build()
val volumeBuilder = spec.volumeConf match {
- case KubernetesHostPathVolumeConf(hostPath) =>
- /* "" means that no checks will be performed before mounting the
hostPath volume */
+ case KubernetesHostPathVolumeConf(hostPath, volumeType) =>
new VolumeBuilder()
- .withHostPath(new HostPathVolumeSource(hostPath, ""))
+ .withHostPath(new HostPathVolumeSource(hostPath, volumeType))
case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size,
labels) =>
val claimName = conf match {
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
index 7e0a65bcdda9..e0ddcd3d416f 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
@@ -113,9 +113,10 @@ object KubernetesTestConf {
volumes.foreach { case spec =>
val (vtype, configs) = spec.volumeConf match {
- case KubernetesHostPathVolumeConf(path) =>
- (KUBERNETES_VOLUMES_HOSTPATH_TYPE,
- Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path))
+ case KubernetesHostPathVolumeConf(hostPath, volumeType) =>
+ (KUBERNETES_VOLUMES_HOSTPATH_TYPE, Map(
+ KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> hostPath,
+ KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY -> volumeType))
case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit,
labels) =>
val sconf = storageClass
@@ -145,6 +146,10 @@ object KubernetesTestConf {
conf.set(key(vtype, spec.volumeName,
KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY),
spec.mountSubPath)
}
+ if (spec.mountSubPathExpr.nonEmpty) {
+ conf.set(key(vtype, spec.volumeName,
KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY),
+ spec.mountSubPathExpr)
+ }
conf.set(key(vtype, spec.volumeName,
KUBERNETES_VOLUMES_MOUNT_READONLY_KEY),
spec.mountReadOnly.toString)
configs.foreach { case (k, v) =>
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
index 5c103739d308..1e62db725fb6 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
@@ -30,7 +30,20 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesHostPathVolumeConf] ===
- KubernetesHostPathVolumeConf("/hostPath"))
+ KubernetesHostPathVolumeConf("/hostPath", ""))
+ }
+
+ test("Parses hostPath volume type correctly") {
+ val sparkConf = new SparkConf(false)
+ sparkConf.set("test.hostPath.volumeName.mount.path", "/path")
+ sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath")
+ sparkConf.set("test.hostPath.volumeName.options.type", "Type")
+
+ val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf,
"test.").head
+ assert(volumeSpec.volumeName === "volumeName")
+ assert(volumeSpec.mountPath === "/path")
+ assert(volumeSpec.volumeConf.asInstanceOf[KubernetesHostPathVolumeConf] ===
+ KubernetesHostPathVolumeConf("/hostPath", "Type"))
}
test("Parses subPath correctly") {
@@ -43,6 +56,33 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountSubPath === "subPath")
+ assert(volumeSpec.mountSubPathExpr === "")
+ }
+
+ test("Parses subPathExpr correctly") {
+ val sparkConf = new SparkConf(false)
+ sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
+ sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
+ sparkConf.set("test.emptyDir.volumeName.mount.subPathExpr", "subPathExpr")
+
+ val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf,
"test.").head
+ assert(volumeSpec.volumeName === "volumeName")
+ assert(volumeSpec.mountPath === "/path")
+ assert(volumeSpec.mountSubPath === "")
+ assert(volumeSpec.mountSubPathExpr === "subPathExpr")
+ }
+
+ test("Rejects mutually exclusive subPath and subPathExpr") {
+ val sparkConf = new SparkConf(false)
+ sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
+ sparkConf.set("test.emptyDir.volumeName.mount.subPath", "subPath")
+ sparkConf.set("test.emptyDir.volumeName.mount.subPathExpr", "subPathExpr")
+
+ val msg = intercept[IllegalArgumentException] {
+ KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
+ }.getMessage
+ assert(msg === "These config options are mutually exclusive: " +
+ "emptyDir.volumeName.mount.subPath,
emptyDir.volumeName.mount.subPathExpr")
}
test("Parses persistentVolumeClaim volumes correctly") {
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
index eaadad163f06..3a9561051a89 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
@@ -137,8 +137,9 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite {
"spark-local-dir-test",
"/tmp",
"",
+ "",
false,
- KubernetesHostPathVolumeConf("/hostPath/tmp")
+ KubernetesHostPathVolumeConf("/hostPath/tmp", "")
)
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes =
Seq(volumeConf))
val mountVolumeStep = new MountVolumesFeatureStep(kubernetesConf)
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
index 6a68898c5f61..c94a7a6ec26a 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
@@ -27,8 +27,9 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
+ "",
false,
- KubernetesHostPathVolumeConf("/hostPath/tmp")
+ KubernetesHostPathVolumeConf("/hostPath/tmp", "type")
)
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes =
Seq(volumeConf))
val step = new MountVolumesFeatureStep(kubernetesConf)
@@ -36,6 +37,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
assert(configuredPod.pod.getSpec.getVolumes.get(0).getHostPath.getPath ===
"/hostPath/tmp")
+ assert(configuredPod.pod.getSpec.getVolumes.get(0).getHostPath.getType ===
"type")
assert(configuredPod.container.getVolumeMounts.size() === 1)
assert(configuredPod.container.getVolumeMounts.get(0).getMountPath ===
"/tmp")
assert(configuredPod.container.getVolumeMounts.get(0).getName ===
"testVolume")
@@ -47,6 +49,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
+ "",
true,
KubernetesPVCVolumeConf("pvcClaim")
)
@@ -69,6 +72,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
+ "",
true,
KubernetesPVCVolumeConf("pvc-spark-SPARK_EXECUTOR_ID")
)
@@ -94,6 +98,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
+ "",
true,
KubernetesPVCVolumeConf("pvc-spark-SPARK_EXECUTOR_ID", Some("fast"),
Some("512M"))
)
@@ -119,6 +124,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
+ "",
true,
KubernetesPVCVolumeConf("OnDemand")
)
@@ -136,6 +142,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
+ "",
true,
KubernetesPVCVolumeConf(claimName =
MountVolumesFeatureStep.PVC_ON_DEMAND,
storageClass = Some("gp3"),
@@ -156,6 +163,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
+ "",
true,
KubernetesPVCVolumeConf(claimName =
MountVolumesFeatureStep.PVC_ON_DEMAND,
storageClass = Some("gp3"),
@@ -177,6 +185,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"checkpointVolume1",
"/checkpoints1",
"",
+ "",
true,
KubernetesPVCVolumeConf(claimName = "pvcClaim1",
storageClass = Some("gp3"),
@@ -188,6 +197,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"checkpointVolume2",
"/checkpoints2",
"",
+ "",
true,
KubernetesPVCVolumeConf(claimName = "pvcClaim2",
storageClass = Some("gp3"),
@@ -209,6 +219,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
+ "",
true,
KubernetesPVCVolumeConf(MountVolumesFeatureStep.PVC_ON_DEMAND)
)
@@ -226,6 +237,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
+ "",
false,
KubernetesEmptyDirVolumeConf(Some("Memory"), Some("6G"))
)
@@ -249,6 +261,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
+ "",
false,
KubernetesEmptyDirVolumeConf(None, None)
)
@@ -271,6 +284,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
+ "",
false,
KubernetesNFSVolumeConf("/share/name", "nfs.example.com")
)
@@ -293,6 +307,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
+ "",
true,
KubernetesNFSVolumeConf("/share/name", "nfs.example.com")
)
@@ -315,13 +330,15 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"hpVolume",
"/tmp",
"",
+ "",
false,
- KubernetesHostPathVolumeConf("/hostPath/tmp")
+ KubernetesHostPathVolumeConf("/hostPath/tmp", "")
)
val pvcVolumeConf = KubernetesVolumeSpec(
"checkpointVolume",
"/checkpoints",
"",
+ "",
true,
KubernetesPVCVolumeConf("pvcClaim")
)
@@ -339,13 +356,15 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"hpVolume",
"/data",
"",
+ "",
false,
- KubernetesHostPathVolumeConf("/hostPath/tmp")
+ KubernetesHostPathVolumeConf("/hostPath/tmp", "")
)
val pvcVolumeConf = KubernetesVolumeSpec(
"checkpointVolume",
"/data",
"",
+ "",
true,
KubernetesPVCVolumeConf("pvcClaim")
)
@@ -364,6 +383,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"foo",
+ "",
false,
KubernetesEmptyDirVolumeConf(None, None)
)
@@ -378,11 +398,32 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(emptyDirMount.getSubPath === "foo")
}
+ test("Mounts subpathexpr on emptyDir") {
+ val volumeConf = KubernetesVolumeSpec(
+ "testVolume",
+ "/tmp",
+ "",
+ "foo",
+ false,
+ KubernetesEmptyDirVolumeConf(None, None)
+ )
+ val kubernetesConf = KubernetesTestConf.createDriverConf(volumes =
Seq(volumeConf))
+ val step = new MountVolumesFeatureStep(kubernetesConf)
+ val configuredPod = step.configurePod(SparkPod.initialPod())
+
+ assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
+ val emptyDirMount = configuredPod.container.getVolumeMounts.get(0)
+ assert(emptyDirMount.getMountPath === "/tmp")
+ assert(emptyDirMount.getName === "testVolume")
+ assert(emptyDirMount.getSubPathExpr === "foo")
+ }
+
test("Mounts subpath on persistentVolumeClaims") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"bar",
+ "",
true,
KubernetesPVCVolumeConf("pvcClaim")
)
@@ -400,12 +441,36 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(pvcMount.getSubPath === "bar")
}
+ test("Mounts subpathexpr on persistentVolumeClaims") {
+ val volumeConf = KubernetesVolumeSpec(
+ "testVolume",
+ "/tmp",
+ "",
+ "bar",
+ true,
+ KubernetesPVCVolumeConf("pvcClaim")
+ )
+ val kubernetesConf = KubernetesTestConf.createDriverConf(volumes =
Seq(volumeConf))
+ val step = new MountVolumesFeatureStep(kubernetesConf)
+ val configuredPod = step.configurePod(SparkPod.initialPod())
+
+ assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
+ val pvcClaim =
configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
+ assert(pvcClaim.getClaimName === "pvcClaim")
+ assert(configuredPod.container.getVolumeMounts.size() === 1)
+ val pvcMount = configuredPod.container.getVolumeMounts.get(0)
+ assert(pvcMount.getMountPath === "/tmp")
+ assert(pvcMount.getName === "testVolume")
+ assert(pvcMount.getSubPathExpr === "bar")
+ }
+
test("Mounts multiple subpaths") {
val volumeConf = KubernetesEmptyDirVolumeConf(None, None)
val emptyDirSpec = KubernetesVolumeSpec(
"testEmptyDir",
"/tmp/foo",
"foo",
+ "",
true,
KubernetesEmptyDirVolumeConf(None, None)
)
@@ -413,6 +478,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testPVC",
"/tmp/bar",
"bar",
+ "",
true,
KubernetesEmptyDirVolumeConf(None, None)
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]