This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ab18cc  [SPARK-38383][K8S] Support `APP_ID` and `EXECUTOR_ID` 
placeholder in annotations
3ab18cc is described below

commit 3ab18cc0be295676e073842ecb7e0e51d11fbd75
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Mar 1 20:23:18 2022 -0800

    [SPARK-38383][K8S] Support `APP_ID` and `EXECUTOR_ID` placeholder in 
annotations
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `APP_ID` and `EXECUTOR_ID` placeholder in K8s 
annotation in the same way we did for `EXECUTOR_JAVA_OPTIONS`.
    
    ### Why are the changes needed?
    
    Although Apache Spark provides `spark-app-id` already, some custom 
schedulers are not able to recognize them.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No because the pattern strings are very specific.
    
    ### How was this patch tested?
    
    Pass the CIs and K8s IT.
    
    This passed like the following on `Docker Desktop K8s`.
    ```
    $ build/sbt -Psparkr -Pkubernetes -Pkubernetes-integration-tests 
-Dtest.exclude.tags=minikube 
-Dspark.kubernetes.test.deployMode=docker-for-desktop 
"kubernetes-integration-tests/test"
    [info] KubernetesSuite:
    [info] - Run SparkPi with no resources (8 seconds, 789 milliseconds)
    [info] - Run SparkPi with no resources & statefulset allocation (8 seconds, 
903 milliseconds)
    [info] - Run SparkPi with a very long application name. (8 seconds, 586 
milliseconds)
    [info] - Use SparkLauncher.NO_RESOURCE (8 seconds, 409 milliseconds)
    [info] - Run SparkPi with a master URL without a scheme. (8 seconds, 586 
milliseconds)
    [info] - Run SparkPi with an argument. (8 seconds, 708 milliseconds)
    [info] - Run SparkPi with custom labels, annotations, and environment 
variables. (8 seconds, 626 milliseconds)
    [info] - All pods have the same service account by default (8 seconds, 595 
milliseconds)
    [info] - Run extraJVMOptions check on driver (4 seconds, 324 milliseconds)
    [info] - Run SparkRemoteFileTest using a remote data file (8 seconds, 424 
milliseconds)
    [info] - Verify logging configuration is picked from the provided 
SPARK_CONF_DIR/log4j2.properties (13 seconds, 42 milliseconds)
    [info] - Run SparkPi with env and mount secrets. (16 seconds, 600 
milliseconds)
    [info] - Run PySpark on simple pi.py example (11 seconds, 479 milliseconds)
    [info] - Run PySpark to test a pyfiles example (10 seconds, 669 
milliseconds)
    [info] - Run PySpark with memory customization (8 seconds, 604 milliseconds)
    [info] - Run in client mode. (7 seconds, 349 milliseconds)
    [info] - Start pod creation from template (8 seconds, 779 milliseconds)
    [info] - Test basic decommissioning (42 seconds, 970 milliseconds)
    [info] - Test basic decommissioning with shuffle cleanup (42 seconds, 650 
milliseconds)
    [info] - Test decommissioning with dynamic allocation & shuffle cleanups (2 
minutes, 41 seconds)
    [info] - Test decommissioning timeouts (43 seconds, 340 milliseconds)
    [info] - SPARK-37576: Rolling decommissioning (1 minute, 6 seconds)
    [info] - Run SparkR on simple dataframe.R example (11 seconds, 645 
milliseconds)
    ```
    
    Closes #35704 from dongjoon-hyun/SPARK-38383.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/deploy/k8s/features/BasicDriverFeatureStep.scala  |  3 ++-
 .../deploy/k8s/features/BasicExecutorFeatureStep.scala      |  5 ++++-
 .../deploy/k8s/features/BasicDriverFeatureStepSuite.scala   | 13 +++++++++----
 .../spark/deploy/k8s/integrationtest/BasicTestsSuite.scala  |  2 ++
 .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala  |  3 +++
 5 files changed, 20 insertions(+), 6 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index f2104d4..3b2b561 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -142,7 +142,8 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
       .editOrNewMetadata()
         .withName(driverPodName)
         .addToLabels(conf.labels.asJava)
-        .addToAnnotations(conf.annotations.asJava)
+        .addToAnnotations(conf.annotations.map { case (k, v) =>
+          (k, Utils.substituteAppNExecIds(v, conf.appId, "")) }.asJava)
         .endMetadata()
       .editOrNewSpec()
         .withRestartPolicy("Never")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index c608472..a762519 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -272,11 +272,14 @@ private[spark] class BasicExecutorFeatureStep(
       case "statefulset" => "Always"
       case _ => "Never"
     }
+    val annotations = kubernetesConf.annotations.map { case (k, v) =>
+      (k, Utils.substituteAppNExecIds(v, kubernetesConf.appId, 
kubernetesConf.executorId))
+    }
     val executorPodBuilder = new PodBuilder(pod.pod)
       .editOrNewMetadata()
         .withName(name)
         .addToLabels(kubernetesConf.labels.asJava)
-        .addToAnnotations(kubernetesConf.annotations.asJava)
+        .addToAnnotations(annotations.asJava)
         .addToOwnerReferences(ownerReference.toSeq: _*)
         .endMetadata()
       .editOrNewSpec()
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index 83444e5..0b54599 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, 
LocalObjectReferenceBuilder, Quantity}
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesTestConf, 
SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import 
org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation
@@ -36,7 +36,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
 
   private val CUSTOM_DRIVER_LABELS = Map("labelkey" -> "labelvalue")
   private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
-  private val DRIVER_ANNOTATIONS = Map("customAnnotation" -> 
"customAnnotationValue")
+  private val DRIVER_ANNOTATIONS = Map(
+    "customAnnotation" -> "customAnnotationValue",
+    "yunikorn.apache.org/app-id" -> "{{APPID}}")
   private val DRIVER_ENVS = Map(
     "customDriverEnv1" -> "customDriverEnv2",
     "customDriverEnv2" -> "customDriverEnv2")
@@ -62,7 +64,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       sparkConf.set(testRInfo.rId.amountConf, testRInfo.count)
       sparkConf.set(testRInfo.rId.vendorConf, testRInfo.vendor)
     }
-    val kubernetesConf = KubernetesTestConf.createDriverConf(
+    val kubernetesConf: KubernetesDriverConf = 
KubernetesTestConf.createDriverConf(
       sparkConf = sparkConf,
       labels = CUSTOM_DRIVER_LABELS,
       environment = DRIVER_ENVS,
@@ -123,7 +125,10 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     }
     assert(driverPodMetadata.getLabels === kubernetesConf.labels.asJava)
 
-    assert(driverPodMetadata.getAnnotations.asScala === DRIVER_ANNOTATIONS)
+    val annotations = driverPodMetadata.getAnnotations.asScala
+    DRIVER_ANNOTATIONS.foreach { case (k, v) =>
+      assert(annotations(k) === Utils.substituteAppNExecIds(v, 
KubernetesTestConf.APP_ID, ""))
+    }
     assert(configuredPod.pod.getSpec.getRestartPolicy === "Never")
     val expectedSparkConf = Map(
       KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
index 0e79f6c5..a79442a 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
@@ -82,12 +82,14 @@ private[spark] trait BasicTestsSuite { k8sSuite: 
KubernetesSuite =>
       .set("spark.kubernetes.driver.label.label2", "label2-value")
       .set("spark.kubernetes.driver.annotation.annotation1", 
"annotation1-value")
       .set("spark.kubernetes.driver.annotation.annotation2", 
"annotation2-value")
+      .set("spark.kubernetes.driver.annotation.yunikorn.apache.org/app-id", 
"{{APP_ID}}")
       .set("spark.kubernetes.driverEnv.ENV1", "VALUE1")
       .set("spark.kubernetes.driverEnv.ENV2", "VALUE2")
       .set("spark.kubernetes.executor.label.label1", "label1-value")
       .set("spark.kubernetes.executor.label.label2", "label2-value")
       .set("spark.kubernetes.executor.annotation.annotation1", 
"annotation1-value")
       .set("spark.kubernetes.executor.annotation.annotation2", 
"annotation2-value")
+      .set("spark.kubernetes.executor.annotation.yunikorn.apache.org/app-id", 
"{{APP_ID}}")
       .set("spark.executorEnv.ENV1", "VALUE1")
       .set("spark.executorEnv.ENV2", "VALUE2")
 
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index ca7eae1..15ce487 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -35,6 +35,7 @@ import org.scalatest.matchers.should.Matchers._
 import org.scalatest.time.{Minutes, Seconds, Span}
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.Constants.ENV_APPLICATION_ID
 import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
 import 
org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, 
IntegrationTestBackendFactory}
 import org.apache.spark.internal.Logging
@@ -563,6 +564,7 @@ class KubernetesSuite extends SparkFunSuite
     assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
     assert(pod.getMetadata.getAnnotations.get("annotation1") === 
"annotation1-value")
     assert(pod.getMetadata.getAnnotations.get("annotation2") === 
"annotation2-value")
+    val appId = 
pod.getMetadata.getAnnotations.get("yunikorn.apache.org/app-id")
 
     val container = pod.getSpec.getContainers.get(0)
     val envVars = container
@@ -574,6 +576,7 @@ class KubernetesSuite extends SparkFunSuite
       .toMap
     assert(envVars("ENV1") === "VALUE1")
     assert(envVars("ENV2") === "VALUE2")
+    assert(appId === envVars(ENV_APPLICATION_ID))
   }
 
   private def deleteDriverPod(): Unit = {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to