This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3d62dd72a58f [SPARK-47730][K8S] Support `APP_ID` and `EXECUTOR_ID` placeholders in labels 3d62dd72a58f is described below commit 3d62dd72a58f5a19e9a371acc09604ab9ceb9e68 Author: Xi Chen <jshmche...@gmail.com> AuthorDate: Sun Apr 28 18:30:06 2024 -0700 [SPARK-47730][K8S] Support `APP_ID` and `EXECUTOR_ID` placeholders in labels ### What changes were proposed in this pull request? Currently, only the pod annotations supports `APP_ID` and `EXECUTOR_ID` placeholders. This commit aims to add the same function to pod labels. ### Why are the changes needed? The use case is to support using customized labels for availability zone based topology pod affinity. We want to use the Spark application ID as the customized label value, to allow Spark executor pods to run in the same availability zone as Spark driver pod. Although we can use the Spark internal label `spark-app-selector` directly, this is not a good practice when using it along with YuniKorn Gang Scheduling. When Gang Scheduling is enabled, the YuniKorn placeholder pods should use the same affinity as real Spark pods. In this way, we have to add the internal `spark-app-selector` label to the placeholder pods. This is not good because the placeholder pods could be recognized as Spark pods in the monitoring system. Thus we propose supporting the `APP_ID` and `EXECUTOR_ID` placeholders in Spark pod labels as well for flexibility. ### Does this PR introduce _any_ user-facing change? No because the pattern strings are very specific. ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46149 from jshmchenxi/SPARK-47730/support-app-placeholder-in-labels. Authored-by: Xi Chen <jshmche...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../scala/org/apache/spark/deploy/k8s/KubernetesConf.scala | 10 ++++++---- .../org/apache/spark/deploy/k8s/KubernetesConfSuite.scala | 13 ++++++++++--- .../deploy/k8s/features/BasicDriverFeatureStepSuite.scala | 11 +++++++---- .../spark/deploy/k8s/integrationtest/BasicTestsSuite.scala | 6 ++++-- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 6 ++++-- 5 files changed, 31 insertions(+), 15 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index a1ef04f4e311..b55f9317d10b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -100,8 +100,9 @@ private[spark] class KubernetesDriverConf( SPARK_APP_ID_LABEL -> appId, SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(appName), SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) - val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) + val driverCustomLabels = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) + .map { case(k, v) => (k, Utils.substituteAppNExecIds(v, appId, "")) } presetLabels.keys.foreach { key => require( @@ -173,8 +174,9 @@ private[spark] class KubernetesExecutorConf( SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE, SPARK_RESOURCE_PROFILE_ID_LABEL -> resourceProfileId.toString) - val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX) + val executorCustomLabels = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX) + .map { case(k, v) => (k, Utils.substituteAppNExecIds(v, appId, executorId)) } presetLabels.keys.foreach { key => require( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index 9963db016ad9..3c53e9b74f92 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -40,7 +40,9 @@ class KubernetesConfSuite extends SparkFunSuite { "execNodeSelectorKey2" -> "execNodeSelectorValue2") private val CUSTOM_LABELS = Map( "customLabel1Key" -> "customLabel1Value", - "customLabel2Key" -> "customLabel2Value") + "customLabel2Key" -> "customLabel2Value", + "customLabel3Key" -> "{{APP_ID}}", + "customLabel4Key" -> "{{EXECUTOR_ID}}") private val CUSTOM_ANNOTATIONS = Map( "customAnnotation1Key" -> "customAnnotation1Value", "customAnnotation2Key" -> "customAnnotation2Value", @@ -95,7 +97,9 @@ class KubernetesConfSuite extends SparkFunSuite { SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID, SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName), SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++ - CUSTOM_LABELS) + CUSTOM_LABELS.map { + case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, "")) + }) assert(conf.annotations === CUSTOM_ANNOTATIONS.map { case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, "")) }) @@ -165,7 +169,10 @@ class KubernetesConfSuite extends SparkFunSuite { SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID, SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE, - SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) ++ CUSTOM_LABELS) + SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) ++ + CUSTOM_LABELS.map { + case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, EXECUTOR_ID)) + }) assert(conf.annotations === CUSTOM_ANNOTATIONS.map { case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, EXECUTOR_ID)) }) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index f102851e6c3b..bf022ac63015 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -35,11 +35,13 @@ import org.apache.spark.util.Utils class BasicDriverFeatureStepSuite extends SparkFunSuite { - private val CUSTOM_DRIVER_LABELS = Map("labelkey" -> "labelvalue") + private val CUSTOM_DRIVER_LABELS = Map( + "labelkey" -> "labelvalue", + "customAppIdLabelKey" -> "{{APP_ID}}") private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent" private val DRIVER_ANNOTATIONS = Map( "customAnnotation" -> "customAnnotationValue", - "yunikorn.apache.org/app-id" -> "{{APPID}}") + "customAppIdAnnotation" -> "{{APP_ID}}") private val DRIVER_ENVS = Map( "customDriverEnv1" -> "customDriverEnv1Value", "customDriverEnv2" -> "customDriverEnv2Value") @@ -121,10 +123,11 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(driverPodMetadata.getName === "spark-driver-pod") // Check custom and preset labels are as expected + val labels = driverPodMetadata.getLabels CUSTOM_DRIVER_LABELS.foreach { case (k, v) => - assert(driverPodMetadata.getLabels.get(k) === v) + assert(labels.get(k) === Utils.substituteAppNExecIds(v, KubernetesTestConf.APP_ID, "")) } - assert(driverPodMetadata.getLabels === kubernetesConf.labels.asJava) + assert(labels === kubernetesConf.labels.asJava) val annotations = driverPodMetadata.getAnnotations.asScala DRIVER_ANNOTATIONS.foreach { case (k, v) => diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala index d6911aadfa23..0dafe30c364a 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala @@ -102,16 +102,18 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => sparkAppConf .set("spark.kubernetes.driver.label.label1", "label1-value") .set("spark.kubernetes.driver.label.label2", "label2-value") + .set("spark.kubernetes.driver.label.customAppIdLabelKey", "{{APP_ID}}") .set("spark.kubernetes.driver.annotation.annotation1", "annotation1-value") .set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value") - .set("spark.kubernetes.driver.annotation.yunikorn.apache.org/app-id", "{{APP_ID}}") + .set("spark.kubernetes.driver.annotation.customAppIdAnnotation", "{{APP_ID}}") .set("spark.kubernetes.driverEnv.ENV1", "VALUE1") .set("spark.kubernetes.driverEnv.ENV2", "VALUE2") .set("spark.kubernetes.executor.label.label1", "label1-value") .set("spark.kubernetes.executor.label.label2", "label2-value") + .set("spark.kubernetes.executor.label.customAppIdLabelKey", "{{APP_ID}}") .set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value") .set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value") - .set("spark.kubernetes.executor.annotation.yunikorn.apache.org/app-id", "{{APP_ID}}") + .set("spark.kubernetes.executor.annotation.customAppIdAnnotation", "{{APP_ID}}") .set("spark.executorEnv.ENV1", "VALUE1") .set("spark.executorEnv.ENV2", "VALUE2") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 868461fd5b9e..0b0b30e5e04f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -589,7 +589,8 @@ class KubernetesSuite extends SparkFunSuite assert(pod.getMetadata.getLabels.get("label2") === "label2-value") assert(pod.getMetadata.getAnnotations.get("annotation1") === "annotation1-value") assert(pod.getMetadata.getAnnotations.get("annotation2") === "annotation2-value") - val appId = pod.getMetadata.getAnnotations.get("yunikorn.apache.org/app-id") + val appIdLabel = pod.getMetadata.getLabels.get("customAppIdLabelKey") + val appIdAnnotation = pod.getMetadata.getAnnotations.get("customAppIdAnnotation") val container = pod.getSpec.getContainers.get(0) val envVars = container @@ -601,7 +602,8 @@ class KubernetesSuite extends SparkFunSuite .toMap assert(envVars("ENV1") === "VALUE1") assert(envVars("ENV2") === "VALUE2") - assert(appId === envVars(ENV_APPLICATION_ID)) + assert(appIdLabel === envVars(ENV_APPLICATION_ID)) + assert(appIdAnnotation === envVars(ENV_APPLICATION_ID)) } private def deleteDriverPod(): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org