This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d68048b06a04 [SPARK-49833][K8S] Support user-defined annotations for 
OnDemand PVCs
d68048b06a04 is described below

commit d68048b06a046cc67ff431fdd8a687b0a1f43603
Author: prathit06 <[email protected]>
AuthorDate: Mon Sep 30 14:26:01 2024 -0700

    [SPARK-49833][K8S] Support user-defined annotations for OnDemand PVCs
    
    ### What changes were proposed in this pull request?
    Currently for on-demand PVCs we cannot add user-defined annotations, 
user-defined annotations can greatly help to add tags in underlying storage.
    For e.g. if we add `k8s-pvc-tagger/tags` annotation & provide a map like 
{"env":"dev"}, the same tags are reflected on underlying storage (for e.g. AWS 
EBS)
    
    ### Why are the changes needed?
    Changes are needed so users can set custom annotations to PVCs
    
    ### Does this PR introduce _any_ user-facing change?
    It does not break any existing behaviour but adds a new feature/improvement 
to enable custom annotations additions to ondemand PVCs
    
    ### How was this patch tested?
    This was tested in internal/production k8 cluster
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #48299 from prathit06/ondemand-pvc-annotations.
    
    Authored-by: prathit06 <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 docs/running-on-kubernetes.md                      | 18 +++++
 .../scala/org/apache/spark/deploy/k8s/Config.scala |  1 +
 .../spark/deploy/k8s/KubernetesVolumeSpec.scala    |  3 +-
 .../spark/deploy/k8s/KubernetesVolumeUtils.scala   | 14 +++-
 .../k8s/features/MountVolumesFeatureStep.scala     |  7 +-
 .../spark/deploy/k8s/KubernetesTestConf.scala      |  8 ++-
 .../deploy/k8s/KubernetesVolumeUtilsSuite.scala    | 42 +++++++++++-
 .../features/MountVolumesFeatureStepSuite.scala    | 77 ++++++++++++++++++++++
 8 files changed, 160 insertions(+), 10 deletions(-)

diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index d8be32e04771..f8b935fd77f5 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1191,6 +1191,15 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
   <td>4.0.0</td>
 </tr>
+<tr>
+  
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].annotation.[AnnotationName]</code></td>
+  <td>(none)</td>
+  <td>
+   Configure <a 
href="https://kubernetes.io/docs/concepts/storage/volumes/";>Kubernetes 
Volume</a> annotations passed to the Kubernetes with 
<code>AnnotationName</code> as key having specified value, must conform with 
Kubernetes annotations format. For example,
+   
<code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.annotation.foo=bar</code>.
+  </td>
+  <td>4.0.0</td>
+</tr>
 <tr>
   
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path</code></td>
   <td>(none)</td>
@@ -1236,6 +1245,15 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
   <td>4.0.0</td>
 </tr>
+<tr>
+  
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].annotation.[AnnotationName]</code></td>
+  <td>(none)</td>
+  <td>
+   Configure <a 
href="https://kubernetes.io/docs/concepts/storage/volumes/";>Kubernetes 
Volume</a> annotations passed to the Kubernetes with 
<code>AnnotationName</code> as key having specified value, must conform with 
Kubernetes annotations format. For example,
+   
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.annotation.foo=bar</code>.
+  </td>
+  <td>4.0.0</td>
+</tr>
 <tr>
   <td><code>spark.kubernetes.local.dirs.tmpfs</code></td>
   <td><code>false</code></td>
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 9c50f8ddb00c..db7fc85976c2 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -779,6 +779,7 @@ private[spark] object Config extends Logging {
   val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit"
   val KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY = "options.server"
   val KUBERNETES_VOLUMES_LABEL_KEY = "label."
+  val KUBERNETES_VOLUMES_ANNOTATION_KEY = "annotation."
   val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
 
   val KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH = 253
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
index b4fe414e3cde..b7113a562fa0 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
@@ -25,7 +25,8 @@ private[spark] case class KubernetesPVCVolumeConf(
     claimName: String,
     storageClass: Option[String] = None,
     size: Option[String] = None,
-    labels: Option[Map[String, String]] = None)
+    labels: Option[Map[String, String]] = None,
+    annotations: Option[Map[String, String]] = None)
   extends KubernetesVolumeSpecificConf
 
 private[spark] case class KubernetesEmptyDirVolumeConf(
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
index 88bb998d88b7..95821a909f35 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
@@ -47,6 +47,7 @@ object KubernetesVolumeUtils {
       val subPathKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
       val subPathExprKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY"
       val labelKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_LABEL_KEY"
+      val annotationKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_ANNOTATION_KEY"
       verifyMutuallyExclusiveOptionKeys(properties, subPathKey, subPathExprKey)
 
       val volumeLabelsMap = properties
@@ -54,6 +55,11 @@ object KubernetesVolumeUtils {
         .map {
           case (k, v) => k.replaceAll(labelKey, "") -> v
         }
+      val volumeAnnotationsMap = properties
+        .filter(_._1.startsWith(annotationKey))
+        .map {
+          case (k, v) => k.replaceAll(annotationKey, "") -> v
+        }
 
       KubernetesVolumeSpec(
         volumeName = volumeName,
@@ -62,7 +68,7 @@ object KubernetesVolumeUtils {
         mountSubPathExpr = properties.getOrElse(subPathExprKey, ""),
         mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
         volumeConf = parseVolumeSpecificConf(properties,
-          volumeType, volumeName, Option(volumeLabelsMap)))
+          volumeType, volumeName, Option(volumeLabelsMap), 
Option(volumeAnnotationsMap)))
     }.toSeq
   }
 
@@ -86,7 +92,8 @@ object KubernetesVolumeUtils {
       options: Map[String, String],
       volumeType: String,
       volumeName: String,
-      labels: Option[Map[String, String]]): KubernetesVolumeSpecificConf = {
+      labels: Option[Map[String, String]],
+      annotations: Option[Map[String, String]]): KubernetesVolumeSpecificConf 
= {
     volumeType match {
       case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
         val pathKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
@@ -107,7 +114,8 @@ object KubernetesVolumeUtils {
           options(claimNameKey),
           options.get(storageClassKey),
           options.get(sizeLimitKey),
-          labels)
+          labels,
+          annotations)
 
       case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
         val mediumKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
index eea4604010b2..3d89696f19fc 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
@@ -74,7 +74,7 @@ private[spark] class MountVolumesFeatureStep(conf: 
KubernetesConf)
           new VolumeBuilder()
             .withHostPath(new HostPathVolumeSource(hostPath, volumeType))
 
-        case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, 
labels) =>
+        case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, 
labels, annotations) =>
           val claimName = conf match {
             case c: KubernetesExecutorConf =>
               claimNameTemplate
@@ -91,12 +91,17 @@ private[spark] class MountVolumesFeatureStep(conf: 
KubernetesConf)
               case Some(customLabelsMap) => (customLabelsMap ++ 
defaultVolumeLabels).asJava
               case None => defaultVolumeLabels.asJava
             }
+            val volumeAnnotations = annotations match {
+              case Some(value) => value.asJava
+              case None => Map[String, String]().asJava
+            }
             additionalResources.append(new PersistentVolumeClaimBuilder()
               .withKind(PVC)
               .withApiVersion("v1")
               .withNewMetadata()
                 .withName(claimName)
                 .addToLabels(volumeLabels)
+                .addToAnnotations(volumeAnnotations)
                 .endMetadata()
               .withNewSpec()
                 .withStorageClassName(storageClass.get)
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
index e0ddcd3d416f..e5ed79718d73 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
@@ -118,7 +118,7 @@ object KubernetesTestConf {
             KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> hostPath,
             KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY -> volumeType))
 
-        case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, 
labels) =>
+        case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, 
labels, annotations) =>
           val sconf = storageClass
             .map { s => (KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY, 
s) }.toMap
           val lconf = sizeLimit.map { l => 
(KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap
@@ -126,9 +126,13 @@ object KubernetesTestConf {
             case Some(value) => value.map { case(k, v) => s"label.$k" -> v }
             case None => Map()
           }
+          val aannotations = annotations match {
+            case Some(value) => value.map { case (k, v) => s"annotation.$k" -> 
v }
+            case None => Map()
+          }
           (KUBERNETES_VOLUMES_PVC_TYPE,
             Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++
-              sconf ++ lconf ++ llabels)
+              sconf ++ lconf ++ llabels ++ aannotations)
 
         case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
           val mconf = medium.map { m => 
(KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
index 1e62db725fb6..3c57cba9a7ff 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
@@ -96,7 +96,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
     assert(volumeSpec.mountPath === "/path")
     assert(volumeSpec.mountReadOnly)
     assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
-      KubernetesPVCVolumeConf("claimName", labels = Some(Map())))
+      KubernetesPVCVolumeConf("claimName", labels = Some(Map()), annotations = 
Some(Map())))
   }
 
   test("SPARK-49598: Parses persistentVolumeClaim volumes correctly with 
labels") {
@@ -113,7 +113,8 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
     assert(volumeSpec.mountReadOnly)
     assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
       KubernetesPVCVolumeConf(claimName = "claimName",
-        labels = Some(Map("env" -> "test", "foo" -> "bar"))))
+        labels = Some(Map("env" -> "test", "foo" -> "bar")),
+        annotations = Some(Map())))
   }
 
   test("SPARK-49598: Parses persistentVolumeClaim volumes & puts " +
@@ -128,7 +129,8 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
     assert(volumeSpec.mountPath === "/path")
     assert(volumeSpec.mountReadOnly)
     assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
-      KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map())))
+      KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map()),
+        annotations = Some(Map())))
   }
 
   test("Parses emptyDir volumes correctly") {
@@ -280,4 +282,38 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
     }.getMessage
     assert(m.contains("smaller than 1KiB. Missing units?"))
   }
+
+  test("SPARK-49833: Parses persistentVolumeClaim volumes correctly with 
annotations") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
+    sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", 
"true")
+    sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", 
"claimName")
+    sparkConf.set("test.persistentVolumeClaim.volumeName.annotation.key1", 
"value1")
+    sparkConf.set("test.persistentVolumeClaim.volumeName.annotation.key2", 
"value2")
+
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head
+    assert(volumeSpec.volumeName === "volumeName")
+    assert(volumeSpec.mountPath === "/path")
+    assert(volumeSpec.mountReadOnly)
+    assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
+      KubernetesPVCVolumeConf(claimName = "claimName",
+        labels = Some(Map()),
+        annotations = Some(Map("key1" -> "value1", "key2" -> "value2"))))
+  }
+
+  test("SPARK-49833: Parses persistentVolumeClaim volumes & puts " +
+    "annotations as empty Map if not provided") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
+    sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", 
"true")
+    sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", 
"claimName")
+
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head
+    assert(volumeSpec.volumeName === "volumeName")
+    assert(volumeSpec.mountPath === "/path")
+    assert(volumeSpec.mountReadOnly)
+    assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
+      KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map()),
+        annotations = Some(Map())))
+  }
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
index c94a7a6ec26a..293773ddb9ec 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
@@ -496,4 +496,81 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
     assert(mounts(1).getMountPath === "/tmp/bar")
     assert(mounts(1).getSubPath === "bar")
   }
+
+  test("SPARK-49833: Create and mounts persistentVolumeClaims in driver with 
annotations") {
+    val volumeConf = KubernetesVolumeSpec(
+      "testVolume",
+      "/tmp",
+      "",
+      "",
+      true,
+      KubernetesPVCVolumeConf(claimName = 
MountVolumesFeatureStep.PVC_ON_DEMAND,
+        storageClass = Some("gp3"),
+        size = Some("1Mi"),
+        annotations = Some(Map("env" -> "test")))
+    )
+
+    val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = 
Seq(volumeConf))
+    val step = new MountVolumesFeatureStep(kubernetesConf)
+    val configuredPod = step.configurePod(SparkPod.initialPod())
+    assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
+    val pvcClaim = 
configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
+    assert(pvcClaim.getClaimName.endsWith("-driver-pvc-0"))
+  }
+
+  test("SPARK-49833: Create and mounts persistentVolumeClaims in executors 
with annotations") {
+    val volumeConf = KubernetesVolumeSpec(
+      "testVolume",
+      "/tmp",
+      "",
+      "",
+      true,
+      KubernetesPVCVolumeConf(claimName = 
MountVolumesFeatureStep.PVC_ON_DEMAND,
+        storageClass = Some("gp3"),
+        size = Some("1Mi"),
+        annotations = Some(Map("env" -> "exec-test")))
+    )
+
+    val executorConf = KubernetesTestConf.createExecutorConf(volumes = 
Seq(volumeConf))
+    val executorStep = new MountVolumesFeatureStep(executorConf)
+    val executorPod = executorStep.configurePod(SparkPod.initialPod())
+
+    assert(executorPod.pod.getSpec.getVolumes.size() === 1)
+    val executorPVC = 
executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
+    assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0"))
+  }
+
+  test("SPARK-49833: Mount multiple volumes to executor with annotations") {
+    val pvcVolumeConf1 = KubernetesVolumeSpec(
+      "checkpointVolume1",
+      "/checkpoints1",
+      "",
+      "",
+      true,
+      KubernetesPVCVolumeConf(claimName = "pvcClaim1",
+        storageClass = Some("gp3"),
+        size = Some("1Mi"),
+        annotations = Some(Map("env1" -> "exec-test-1")))
+    )
+
+    val pvcVolumeConf2 = KubernetesVolumeSpec(
+      "checkpointVolume2",
+      "/checkpoints2",
+      "",
+      "",
+      true,
+      KubernetesPVCVolumeConf(claimName = "pvcClaim2",
+        storageClass = Some("gp3"),
+        size = Some("1Mi"),
+        annotations = Some(Map("env2" -> "exec-test-2")))
+    )
+
+    val kubernetesConf = KubernetesTestConf.createExecutorConf(
+      volumes = Seq(pvcVolumeConf1, pvcVolumeConf2))
+    val step = new MountVolumesFeatureStep(kubernetesConf)
+    val configuredPod = step.configurePod(SparkPod.initialPod())
+
+    assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
+    assert(configuredPod.container.getVolumeMounts.size() === 2)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to