This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3ab18cc [SPARK-38383][K8S] Support `APP_ID` and `EXECUTOR_ID`
placeholder in annotations
3ab18cc is described below
commit 3ab18cc0be295676e073842ecb7e0e51d11fbd75
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Mar 1 20:23:18 2022 -0800
[SPARK-38383][K8S] Support `APP_ID` and `EXECUTOR_ID` placeholder in
annotations
### What changes were proposed in this pull request?
This PR aims to support `APP_ID` and `EXECUTOR_ID` placeholder in K8s
annotation in the same way we did for `EXECUTOR_JAVA_OPTIONS`.
### Why are the changes needed?
Although Apache Spark provides `spark-app-id` already, some custom
schedulers are not able to recognize them.
### Does this PR introduce _any_ user-facing change?
No because the pattern strings are very specific.
### How was this patch tested?
Pass the CIs and K8s IT.
This passed like the following on `Docker Desktop K8s`.
```
$ build/sbt -Psparkr -Pkubernetes -Pkubernetes-integration-tests
-Dtest.exclude.tags=minikube
-Dspark.kubernetes.test.deployMode=docker-for-desktop
"kubernetes-integration-tests/test"
[info] KubernetesSuite:
[info] - Run SparkPi with no resources (8 seconds, 789 milliseconds)
[info] - Run SparkPi with no resources & statefulset allocation (8 seconds,
903 milliseconds)
[info] - Run SparkPi with a very long application name. (8 seconds, 586
milliseconds)
[info] - Use SparkLauncher.NO_RESOURCE (8 seconds, 409 milliseconds)
[info] - Run SparkPi with a master URL without a scheme. (8 seconds, 586
milliseconds)
[info] - Run SparkPi with an argument. (8 seconds, 708 milliseconds)
[info] - Run SparkPi with custom labels, annotations, and environment
variables. (8 seconds, 626 milliseconds)
[info] - All pods have the same service account by default (8 seconds, 595
milliseconds)
[info] - Run extraJVMOptions check on driver (4 seconds, 324 milliseconds)
[info] - Run SparkRemoteFileTest using a remote data file (8 seconds, 424
milliseconds)
[info] - Verify logging configuration is picked from the provided
SPARK_CONF_DIR/log4j2.properties (13 seconds, 42 milliseconds)
[info] - Run SparkPi with env and mount secrets. (16 seconds, 600
milliseconds)
[info] - Run PySpark on simple pi.py example (11 seconds, 479 milliseconds)
[info] - Run PySpark to test a pyfiles example (10 seconds, 669
milliseconds)
[info] - Run PySpark with memory customization (8 seconds, 604 milliseconds)
[info] - Run in client mode. (7 seconds, 349 milliseconds)
[info] - Start pod creation from template (8 seconds, 779 milliseconds)
[info] - Test basic decommissioning (42 seconds, 970 milliseconds)
[info] - Test basic decommissioning with shuffle cleanup (42 seconds, 650
milliseconds)
[info] - Test decommissioning with dynamic allocation & shuffle cleanups (2
minutes, 41 seconds)
[info] - Test decommissioning timeouts (43 seconds, 340 milliseconds)
[info] - SPARK-37576: Rolling decommissioning (1 minute, 6 seconds)
[info] - Run SparkR on simple dataframe.R example (11 seconds, 645
milliseconds)
```
Closes #35704 from dongjoon-hyun/SPARK-38383.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/deploy/k8s/features/BasicDriverFeatureStep.scala | 3 ++-
.../deploy/k8s/features/BasicExecutorFeatureStep.scala | 5 ++++-
.../deploy/k8s/features/BasicDriverFeatureStepSuite.scala | 13 +++++++++----
.../spark/deploy/k8s/integrationtest/BasicTestsSuite.scala | 2 ++
.../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 3 +++
5 files changed, 20 insertions(+), 6 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index f2104d4..3b2b561 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -142,7 +142,8 @@ private[spark] class BasicDriverFeatureStep(conf:
KubernetesDriverConf)
.editOrNewMetadata()
.withName(driverPodName)
.addToLabels(conf.labels.asJava)
- .addToAnnotations(conf.annotations.asJava)
+ .addToAnnotations(conf.annotations.map { case (k, v) =>
+ (k, Utils.substituteAppNExecIds(v, conf.appId, "")) }.asJava)
.endMetadata()
.editOrNewSpec()
.withRestartPolicy("Never")
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index c608472..a762519 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -272,11 +272,14 @@ private[spark] class BasicExecutorFeatureStep(
case "statefulset" => "Always"
case _ => "Never"
}
+ val annotations = kubernetesConf.annotations.map { case (k, v) =>
+ (k, Utils.substituteAppNExecIds(v, kubernetesConf.appId,
kubernetesConf.executorId))
+ }
val executorPodBuilder = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.addToLabels(kubernetesConf.labels.asJava)
- .addToAnnotations(kubernetesConf.annotations.asJava)
+ .addToAnnotations(annotations.asJava)
.addToOwnerReferences(ownerReference.toSeq: _*)
.endMetadata()
.editOrNewSpec()
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index 83444e5..0b54599 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder,
LocalObjectReferenceBuilder, Quantity}
import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesTestConf,
SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import
org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation
@@ -36,7 +36,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
private val CUSTOM_DRIVER_LABELS = Map("labelkey" -> "labelvalue")
private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
- private val DRIVER_ANNOTATIONS = Map("customAnnotation" ->
"customAnnotationValue")
+ private val DRIVER_ANNOTATIONS = Map(
+ "customAnnotation" -> "customAnnotationValue",
+ "yunikorn.apache.org/app-id" -> "{{APPID}}")
private val DRIVER_ENVS = Map(
"customDriverEnv1" -> "customDriverEnv2",
"customDriverEnv2" -> "customDriverEnv2")
@@ -62,7 +64,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
sparkConf.set(testRInfo.rId.amountConf, testRInfo.count)
sparkConf.set(testRInfo.rId.vendorConf, testRInfo.vendor)
}
- val kubernetesConf = KubernetesTestConf.createDriverConf(
+ val kubernetesConf: KubernetesDriverConf =
KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
labels = CUSTOM_DRIVER_LABELS,
environment = DRIVER_ENVS,
@@ -123,7 +125,10 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
}
assert(driverPodMetadata.getLabels === kubernetesConf.labels.asJava)
- assert(driverPodMetadata.getAnnotations.asScala === DRIVER_ANNOTATIONS)
+ val annotations = driverPodMetadata.getAnnotations.asScala
+ DRIVER_ANNOTATIONS.foreach { case (k, v) =>
+ assert(annotations(k) === Utils.substituteAppNExecIds(v,
KubernetesTestConf.APP_ID, ""))
+ }
assert(configuredPod.pod.getSpec.getRestartPolicy === "Never")
val expectedSparkConf = Map(
KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
index 0e79f6c5..a79442a 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
@@ -82,12 +82,14 @@ private[spark] trait BasicTestsSuite { k8sSuite:
KubernetesSuite =>
.set("spark.kubernetes.driver.label.label2", "label2-value")
.set("spark.kubernetes.driver.annotation.annotation1",
"annotation1-value")
.set("spark.kubernetes.driver.annotation.annotation2",
"annotation2-value")
+ .set("spark.kubernetes.driver.annotation.yunikorn.apache.org/app-id",
"{{APP_ID}}")
.set("spark.kubernetes.driverEnv.ENV1", "VALUE1")
.set("spark.kubernetes.driverEnv.ENV2", "VALUE2")
.set("spark.kubernetes.executor.label.label1", "label1-value")
.set("spark.kubernetes.executor.label.label2", "label2-value")
.set("spark.kubernetes.executor.annotation.annotation1",
"annotation1-value")
.set("spark.kubernetes.executor.annotation.annotation2",
"annotation2-value")
+ .set("spark.kubernetes.executor.annotation.yunikorn.apache.org/app-id",
"{{APP_ID}}")
.set("spark.executorEnv.ENV1", "VALUE1")
.set("spark.executorEnv.ENV2", "VALUE2")
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index ca7eae1..15ce487 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -35,6 +35,7 @@ import org.scalatest.matchers.should.Matchers._
import org.scalatest.time.{Minutes, Seconds, Span}
import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.Constants.ENV_APPLICATION_ID
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
import
org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend,
IntegrationTestBackendFactory}
import org.apache.spark.internal.Logging
@@ -563,6 +564,7 @@ class KubernetesSuite extends SparkFunSuite
assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
assert(pod.getMetadata.getAnnotations.get("annotation1") ===
"annotation1-value")
assert(pod.getMetadata.getAnnotations.get("annotation2") ===
"annotation2-value")
+ val appId =
pod.getMetadata.getAnnotations.get("yunikorn.apache.org/app-id")
val container = pod.getSpec.getContainers.get(0)
val envVars = container
@@ -574,6 +576,7 @@ class KubernetesSuite extends SparkFunSuite
.toMap
assert(envVars("ENV1") === "VALUE1")
assert(envVars("ENV2") === "VALUE2")
+ assert(appId === envVars(ENV_APPLICATION_ID))
}
private def deleteDriverPod(): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]