This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d62dd72a58f [SPARK-47730][K8S] Support `APP_ID` and `EXECUTOR_ID` 
placeholders in labels
3d62dd72a58f is described below

commit 3d62dd72a58f5a19e9a371acc09604ab9ceb9e68
Author: Xi Chen <jshmche...@gmail.com>
AuthorDate: Sun Apr 28 18:30:06 2024 -0700

    [SPARK-47730][K8S] Support `APP_ID` and `EXECUTOR_ID` placeholders in labels
    
    ### What changes were proposed in this pull request?
    
    Currently, only the pod annotations supports `APP_ID` and `EXECUTOR_ID` 
placeholders. This commit aims to add the same function to pod labels.
    
    ### Why are the changes needed?
    
    The use case is to support using customized labels for availability zone 
based topology pod affinity. We want to use the Spark application ID as the 
customized label value, to allow Spark executor pods to run in the same 
availability zone as Spark driver pod.
    
    Although we can use the Spark internal label `spark-app-selector` directly, 
this is not a good practice when using it along with YuniKorn Gang Scheduling. 
When Gang Scheduling is enabled, the YuniKorn placeholder pods should use the 
same affinity as real Spark pods. In this way, we have to add the internal 
`spark-app-selector` label to the placeholder pods. This is not good because 
the placeholder pods could be recognized as Spark pods in the monitoring system.
    
    Thus we propose supporting the `APP_ID` and `EXECUTOR_ID` placeholders in 
Spark pod labels as well for flexibility.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No because the pattern strings are very specific.
    
    ### How was this patch tested?
    
    Unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #46149 from jshmchenxi/SPARK-47730/support-app-placeholder-in-labels.
    
    Authored-by: Xi Chen <jshmche...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../scala/org/apache/spark/deploy/k8s/KubernetesConf.scala  | 10 ++++++----
 .../org/apache/spark/deploy/k8s/KubernetesConfSuite.scala   | 13 ++++++++++---
 .../deploy/k8s/features/BasicDriverFeatureStepSuite.scala   | 11 +++++++----
 .../spark/deploy/k8s/integrationtest/BasicTestsSuite.scala  |  6 ++++--
 .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala  |  6 ++++--
 5 files changed, 31 insertions(+), 15 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index a1ef04f4e311..b55f9317d10b 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -100,8 +100,9 @@ private[spark] class KubernetesDriverConf(
       SPARK_APP_ID_LABEL -> appId,
       SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(appName),
       SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
-    val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
+    val driverCustomLabels =
+      KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_LABEL_PREFIX)
+        .map { case(k, v) => (k, Utils.substituteAppNExecIds(v, appId, "")) }
 
     presetLabels.keys.foreach { key =>
       require(
@@ -173,8 +174,9 @@ private[spark] class KubernetesExecutorConf(
       SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE,
       SPARK_RESOURCE_PROFILE_ID_LABEL -> resourceProfileId.toString)
 
-    val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
+    val executorCustomLabels =
+      KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_EXECUTOR_LABEL_PREFIX)
+        .map { case(k, v) => (k, Utils.substituteAppNExecIds(v, appId, 
executorId)) }
 
     presetLabels.keys.foreach { key =>
       require(
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index 9963db016ad9..3c53e9b74f92 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -40,7 +40,9 @@ class KubernetesConfSuite extends SparkFunSuite {
     "execNodeSelectorKey2" -> "execNodeSelectorValue2")
   private val CUSTOM_LABELS = Map(
     "customLabel1Key" -> "customLabel1Value",
-    "customLabel2Key" -> "customLabel2Value")
+    "customLabel2Key" -> "customLabel2Value",
+    "customLabel3Key" -> "{{APP_ID}}",
+    "customLabel4Key" -> "{{EXECUTOR_ID}}")
   private val CUSTOM_ANNOTATIONS = Map(
     "customAnnotation1Key" -> "customAnnotation1Value",
     "customAnnotation2Key" -> "customAnnotation2Value",
@@ -95,7 +97,9 @@ class KubernetesConfSuite extends SparkFunSuite {
       SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID,
       SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName),
       SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++
-      CUSTOM_LABELS)
+      CUSTOM_LABELS.map {
+        case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, ""))
+      })
     assert(conf.annotations === CUSTOM_ANNOTATIONS.map {
       case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, ""))
     })
@@ -165,7 +169,10 @@ class KubernetesConfSuite extends SparkFunSuite {
       SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID,
       SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName),
       SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE,
-      SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) 
++ CUSTOM_LABELS)
+      SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) 
++
+      CUSTOM_LABELS.map {
+        case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, 
EXECUTOR_ID))
+      })
     assert(conf.annotations === CUSTOM_ANNOTATIONS.map {
       case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, 
EXECUTOR_ID))
     })
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index f102851e6c3b..bf022ac63015 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -35,11 +35,13 @@ import org.apache.spark.util.Utils
 
 class BasicDriverFeatureStepSuite extends SparkFunSuite {
 
-  private val CUSTOM_DRIVER_LABELS = Map("labelkey" -> "labelvalue")
+  private val CUSTOM_DRIVER_LABELS = Map(
+    "labelkey" -> "labelvalue",
+    "customAppIdLabelKey" -> "{{APP_ID}}")
   private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
   private val DRIVER_ANNOTATIONS = Map(
     "customAnnotation" -> "customAnnotationValue",
-    "yunikorn.apache.org/app-id" -> "{{APPID}}")
+    "customAppIdAnnotation" -> "{{APP_ID}}")
   private val DRIVER_ENVS = Map(
     "customDriverEnv1" -> "customDriverEnv1Value",
     "customDriverEnv2" -> "customDriverEnv2Value")
@@ -121,10 +123,11 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     assert(driverPodMetadata.getName === "spark-driver-pod")
 
     // Check custom and preset labels are as expected
+    val labels = driverPodMetadata.getLabels
     CUSTOM_DRIVER_LABELS.foreach { case (k, v) =>
-      assert(driverPodMetadata.getLabels.get(k) === v)
+      assert(labels.get(k) === Utils.substituteAppNExecIds(v, 
KubernetesTestConf.APP_ID, ""))
     }
-    assert(driverPodMetadata.getLabels === kubernetesConf.labels.asJava)
+    assert(labels === kubernetesConf.labels.asJava)
 
     val annotations = driverPodMetadata.getAnnotations.asScala
     DRIVER_ANNOTATIONS.foreach { case (k, v) =>
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
index d6911aadfa23..0dafe30c364a 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
@@ -102,16 +102,18 @@ private[spark] trait BasicTestsSuite { k8sSuite: 
KubernetesSuite =>
     sparkAppConf
       .set("spark.kubernetes.driver.label.label1", "label1-value")
       .set("spark.kubernetes.driver.label.label2", "label2-value")
+      .set("spark.kubernetes.driver.label.customAppIdLabelKey", "{{APP_ID}}")
       .set("spark.kubernetes.driver.annotation.annotation1", 
"annotation1-value")
       .set("spark.kubernetes.driver.annotation.annotation2", 
"annotation2-value")
-      .set("spark.kubernetes.driver.annotation.yunikorn.apache.org/app-id", 
"{{APP_ID}}")
+      .set("spark.kubernetes.driver.annotation.customAppIdAnnotation", 
"{{APP_ID}}")
       .set("spark.kubernetes.driverEnv.ENV1", "VALUE1")
       .set("spark.kubernetes.driverEnv.ENV2", "VALUE2")
       .set("spark.kubernetes.executor.label.label1", "label1-value")
       .set("spark.kubernetes.executor.label.label2", "label2-value")
+      .set("spark.kubernetes.executor.label.customAppIdLabelKey", "{{APP_ID}}")
       .set("spark.kubernetes.executor.annotation.annotation1", 
"annotation1-value")
       .set("spark.kubernetes.executor.annotation.annotation2", 
"annotation2-value")
-      .set("spark.kubernetes.executor.annotation.yunikorn.apache.org/app-id", 
"{{APP_ID}}")
+      .set("spark.kubernetes.executor.annotation.customAppIdAnnotation", 
"{{APP_ID}}")
       .set("spark.executorEnv.ENV1", "VALUE1")
       .set("spark.executorEnv.ENV2", "VALUE2")
 
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 868461fd5b9e..0b0b30e5e04f 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -589,7 +589,8 @@ class KubernetesSuite extends SparkFunSuite
     assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
     assert(pod.getMetadata.getAnnotations.get("annotation1") === 
"annotation1-value")
     assert(pod.getMetadata.getAnnotations.get("annotation2") === 
"annotation2-value")
-    val appId = 
pod.getMetadata.getAnnotations.get("yunikorn.apache.org/app-id")
+    val appIdLabel = pod.getMetadata.getLabels.get("customAppIdLabelKey")
+    val appIdAnnotation = 
pod.getMetadata.getAnnotations.get("customAppIdAnnotation")
 
     val container = pod.getSpec.getContainers.get(0)
     val envVars = container
@@ -601,7 +602,8 @@ class KubernetesSuite extends SparkFunSuite
       .toMap
     assert(envVars("ENV1") === "VALUE1")
     assert(envVars("ENV2") === "VALUE2")
-    assert(appId === envVars(ENV_APPLICATION_ID))
+    assert(appIdLabel === envVars(ENV_APPLICATION_ID))
+    assert(appIdAnnotation === envVars(ENV_APPLICATION_ID))
   }
 
   private def deleteDriverPod(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to