This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d68048b06a04 [SPARK-49833][K8S] Support user-defined annotations for
OnDemand PVCs
d68048b06a04 is described below
commit d68048b06a046cc67ff431fdd8a687b0a1f43603
Author: prathit06 <[email protected]>
AuthorDate: Mon Sep 30 14:26:01 2024 -0700
[SPARK-49833][K8S] Support user-defined annotations for OnDemand PVCs
### What changes were proposed in this pull request?
Currently for on-demand PVCs we cannot add user-defined annotations,
user-defined annotations can greatly help to add tags in underlying storage.
For e.g. if we add `k8s-pvc-tagger/tags` annotation & provide a map like
{"env":"dev"}, the same tags are reflected on underlying storage (for e.g. AWS
EBS)
### Why are the changes needed?
Changes are needed so users can set custom annotations to PVCs
### Does this PR introduce _any_ user-facing change?
It does not break any existing behaviour but adds a new feature/improvement
to enable custom annotations additions to ondemand PVCs
### How was this patch tested?
This was tested in internal/production k8 cluster
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48299 from prathit06/ondemand-pvc-annotations.
Authored-by: prathit06 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
docs/running-on-kubernetes.md | 18 +++++
.../scala/org/apache/spark/deploy/k8s/Config.scala | 1 +
.../spark/deploy/k8s/KubernetesVolumeSpec.scala | 3 +-
.../spark/deploy/k8s/KubernetesVolumeUtils.scala | 14 +++-
.../k8s/features/MountVolumesFeatureStep.scala | 7 +-
.../spark/deploy/k8s/KubernetesTestConf.scala | 8 ++-
.../deploy/k8s/KubernetesVolumeUtilsSuite.scala | 42 +++++++++++-
.../features/MountVolumesFeatureStepSuite.scala | 77 ++++++++++++++++++++++
8 files changed, 160 insertions(+), 10 deletions(-)
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index d8be32e04771..f8b935fd77f5 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1191,6 +1191,15 @@ See the [configuration page](configuration.html) for
information on Spark config
</td>
<td>4.0.0</td>
</tr>
+<tr>
+
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].annotation.[AnnotationName]</code></td>
+ <td>(none)</td>
+ <td>
+ Configure <a
href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes
Volume</a> annotations passed to the Kubernetes with
<code>AnnotationName</code> as key having specified value, must conform with
Kubernetes annotations format. For example,
+
<code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.annotation.foo=bar</code>.
+ </td>
+ <td>4.0.0</td>
+</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path</code></td>
<td>(none)</td>
@@ -1236,6 +1245,15 @@ See the [configuration page](configuration.html) for
information on Spark config
</td>
<td>4.0.0</td>
</tr>
+<tr>
+
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].annotation.[AnnotationName]</code></td>
+ <td>(none)</td>
+ <td>
+ Configure <a
href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes
Volume</a> annotations passed to the Kubernetes with
<code>AnnotationName</code> as key having specified value, must conform with
Kubernetes annotations format. For example,
+
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.annotation.foo=bar</code>.
+ </td>
+ <td>4.0.0</td>
+</tr>
<tr>
<td><code>spark.kubernetes.local.dirs.tmpfs</code></td>
<td><code>false</code></td>
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 9c50f8ddb00c..db7fc85976c2 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -779,6 +779,7 @@ private[spark] object Config extends Logging {
val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit"
val KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY = "options.server"
val KUBERNETES_VOLUMES_LABEL_KEY = "label."
+ val KUBERNETES_VOLUMES_ANNOTATION_KEY = "annotation."
val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
val KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH = 253
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
index b4fe414e3cde..b7113a562fa0 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
@@ -25,7 +25,8 @@ private[spark] case class KubernetesPVCVolumeConf(
claimName: String,
storageClass: Option[String] = None,
size: Option[String] = None,
- labels: Option[Map[String, String]] = None)
+ labels: Option[Map[String, String]] = None,
+ annotations: Option[Map[String, String]] = None)
extends KubernetesVolumeSpecificConf
private[spark] case class KubernetesEmptyDirVolumeConf(
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
index 88bb998d88b7..95821a909f35 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
@@ -47,6 +47,7 @@ object KubernetesVolumeUtils {
val subPathKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
val subPathExprKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY"
val labelKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_LABEL_KEY"
+ val annotationKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_ANNOTATION_KEY"
verifyMutuallyExclusiveOptionKeys(properties, subPathKey, subPathExprKey)
val volumeLabelsMap = properties
@@ -54,6 +55,11 @@ object KubernetesVolumeUtils {
.map {
case (k, v) => k.replaceAll(labelKey, "") -> v
}
+ val volumeAnnotationsMap = properties
+ .filter(_._1.startsWith(annotationKey))
+ .map {
+ case (k, v) => k.replaceAll(annotationKey, "") -> v
+ }
KubernetesVolumeSpec(
volumeName = volumeName,
@@ -62,7 +68,7 @@ object KubernetesVolumeUtils {
mountSubPathExpr = properties.getOrElse(subPathExprKey, ""),
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
volumeConf = parseVolumeSpecificConf(properties,
- volumeType, volumeName, Option(volumeLabelsMap)))
+ volumeType, volumeName, Option(volumeLabelsMap),
Option(volumeAnnotationsMap)))
}.toSeq
}
@@ -86,7 +92,8 @@ object KubernetesVolumeUtils {
options: Map[String, String],
volumeType: String,
volumeName: String,
- labels: Option[Map[String, String]]): KubernetesVolumeSpecificConf = {
+ labels: Option[Map[String, String]],
+ annotations: Option[Map[String, String]]): KubernetesVolumeSpecificConf
= {
volumeType match {
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
val pathKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
@@ -107,7 +114,8 @@ object KubernetesVolumeUtils {
options(claimNameKey),
options.get(storageClassKey),
options.get(sizeLimitKey),
- labels)
+ labels,
+ annotations)
case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
val mediumKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
index eea4604010b2..3d89696f19fc 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
@@ -74,7 +74,7 @@ private[spark] class MountVolumesFeatureStep(conf:
KubernetesConf)
new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath, volumeType))
- case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size,
labels) =>
+ case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size,
labels, annotations) =>
val claimName = conf match {
case c: KubernetesExecutorConf =>
claimNameTemplate
@@ -91,12 +91,17 @@ private[spark] class MountVolumesFeatureStep(conf:
KubernetesConf)
case Some(customLabelsMap) => (customLabelsMap ++
defaultVolumeLabels).asJava
case None => defaultVolumeLabels.asJava
}
+ val volumeAnnotations = annotations match {
+ case Some(value) => value.asJava
+ case None => Map[String, String]().asJava
+ }
additionalResources.append(new PersistentVolumeClaimBuilder()
.withKind(PVC)
.withApiVersion("v1")
.withNewMetadata()
.withName(claimName)
.addToLabels(volumeLabels)
+ .addToAnnotations(volumeAnnotations)
.endMetadata()
.withNewSpec()
.withStorageClassName(storageClass.get)
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
index e0ddcd3d416f..e5ed79718d73 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
@@ -118,7 +118,7 @@ object KubernetesTestConf {
KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> hostPath,
KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY -> volumeType))
- case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit,
labels) =>
+ case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit,
labels, annotations) =>
val sconf = storageClass
.map { s => (KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY,
s) }.toMap
val lconf = sizeLimit.map { l =>
(KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap
@@ -126,9 +126,13 @@ object KubernetesTestConf {
case Some(value) => value.map { case(k, v) => s"label.$k" -> v }
case None => Map()
}
+ val aannotations = annotations match {
+ case Some(value) => value.map { case (k, v) => s"annotation.$k" ->
v }
+ case None => Map()
+ }
(KUBERNETES_VOLUMES_PVC_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++
- sconf ++ lconf ++ llabels)
+ sconf ++ lconf ++ llabels ++ aannotations)
case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
val mconf = medium.map { m =>
(KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
index 1e62db725fb6..3c57cba9a7ff 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
@@ -96,7 +96,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
- KubernetesPVCVolumeConf("claimName", labels = Some(Map())))
+ KubernetesPVCVolumeConf("claimName", labels = Some(Map()), annotations =
Some(Map())))
}
test("SPARK-49598: Parses persistentVolumeClaim volumes correctly with
labels") {
@@ -113,7 +113,8 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf(claimName = "claimName",
- labels = Some(Map("env" -> "test", "foo" -> "bar"))))
+ labels = Some(Map("env" -> "test", "foo" -> "bar")),
+ annotations = Some(Map())))
}
test("SPARK-49598: Parses persistentVolumeClaim volumes & puts " +
@@ -128,7 +129,8 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
- KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map())))
+ KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map()),
+ annotations = Some(Map())))
}
test("Parses emptyDir volumes correctly") {
@@ -280,4 +282,38 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
}.getMessage
assert(m.contains("smaller than 1KiB. Missing units?"))
}
+
+ test("SPARK-49833: Parses persistentVolumeClaim volumes correctly with
annotations") {
+ val sparkConf = new SparkConf(false)
+ sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
+ sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly",
"true")
+ sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName",
"claimName")
+ sparkConf.set("test.persistentVolumeClaim.volumeName.annotation.key1",
"value1")
+ sparkConf.set("test.persistentVolumeClaim.volumeName.annotation.key2",
"value2")
+
+ val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf,
"test.").head
+ assert(volumeSpec.volumeName === "volumeName")
+ assert(volumeSpec.mountPath === "/path")
+ assert(volumeSpec.mountReadOnly)
+ assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
+ KubernetesPVCVolumeConf(claimName = "claimName",
+ labels = Some(Map()),
+ annotations = Some(Map("key1" -> "value1", "key2" -> "value2"))))
+ }
+
+ test("SPARK-49833: Parses persistentVolumeClaim volumes & puts " +
+ "annotations as empty Map if not provided") {
+ val sparkConf = new SparkConf(false)
+ sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
+ sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly",
"true")
+ sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName",
"claimName")
+
+ val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf,
"test.").head
+ assert(volumeSpec.volumeName === "volumeName")
+ assert(volumeSpec.mountPath === "/path")
+ assert(volumeSpec.mountReadOnly)
+ assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
+ KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map()),
+ annotations = Some(Map())))
+ }
}
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
index c94a7a6ec26a..293773ddb9ec 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
@@ -496,4 +496,81 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(mounts(1).getMountPath === "/tmp/bar")
assert(mounts(1).getSubPath === "bar")
}
+
+ test("SPARK-49833: Create and mounts persistentVolumeClaims in driver with
annotations") {
+ val volumeConf = KubernetesVolumeSpec(
+ "testVolume",
+ "/tmp",
+ "",
+ "",
+ true,
+ KubernetesPVCVolumeConf(claimName =
MountVolumesFeatureStep.PVC_ON_DEMAND,
+ storageClass = Some("gp3"),
+ size = Some("1Mi"),
+ annotations = Some(Map("env" -> "test")))
+ )
+
+ val kubernetesConf = KubernetesTestConf.createDriverConf(volumes =
Seq(volumeConf))
+ val step = new MountVolumesFeatureStep(kubernetesConf)
+ val configuredPod = step.configurePod(SparkPod.initialPod())
+ assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
+ val pvcClaim =
configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
+ assert(pvcClaim.getClaimName.endsWith("-driver-pvc-0"))
+ }
+
+ test("SPARK-49833: Create and mounts persistentVolumeClaims in executors
with annotations") {
+ val volumeConf = KubernetesVolumeSpec(
+ "testVolume",
+ "/tmp",
+ "",
+ "",
+ true,
+ KubernetesPVCVolumeConf(claimName =
MountVolumesFeatureStep.PVC_ON_DEMAND,
+ storageClass = Some("gp3"),
+ size = Some("1Mi"),
+ annotations = Some(Map("env" -> "exec-test")))
+ )
+
+ val executorConf = KubernetesTestConf.createExecutorConf(volumes =
Seq(volumeConf))
+ val executorStep = new MountVolumesFeatureStep(executorConf)
+ val executorPod = executorStep.configurePod(SparkPod.initialPod())
+
+ assert(executorPod.pod.getSpec.getVolumes.size() === 1)
+ val executorPVC =
executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
+ assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0"))
+ }
+
+ test("SPARK-49833: Mount multiple volumes to executor with annotations") {
+ val pvcVolumeConf1 = KubernetesVolumeSpec(
+ "checkpointVolume1",
+ "/checkpoints1",
+ "",
+ "",
+ true,
+ KubernetesPVCVolumeConf(claimName = "pvcClaim1",
+ storageClass = Some("gp3"),
+ size = Some("1Mi"),
+ annotations = Some(Map("env1" -> "exec-test-1")))
+ )
+
+ val pvcVolumeConf2 = KubernetesVolumeSpec(
+ "checkpointVolume2",
+ "/checkpoints2",
+ "",
+ "",
+ true,
+ KubernetesPVCVolumeConf(claimName = "pvcClaim2",
+ storageClass = Some("gp3"),
+ size = Some("1Mi"),
+ annotations = Some(Map("env2" -> "exec-test-2")))
+ )
+
+ val kubernetesConf = KubernetesTestConf.createExecutorConf(
+ volumes = Seq(pvcVolumeConf1, pvcVolumeConf2))
+ val step = new MountVolumesFeatureStep(kubernetesConf)
+ val configuredPod = step.configurePod(SparkPod.initialPod())
+
+ assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
+ assert(configuredPod.container.getVolumeMounts.size() === 2)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]