This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 584b6dbfc99f [SPARK-49190][K8S] Add `SPARK_EXECUTOR_ATTRIBUTE_(APP|EXECUTOR)_ID` if `CUSTOM_EXECUTOR_LOG_URL` is defined 584b6dbfc99f is described below commit 584b6dbfc99f466f32d5e3341ee19f129fae1090 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Fri Aug 9 22:14:06 2024 -0700 [SPARK-49190][K8S] Add `SPARK_EXECUTOR_ATTRIBUTE_(APP|EXECUTOR)_ID` if `CUSTOM_EXECUTOR_LOG_URL` is defined ### What changes were proposed in this pull request? This PR aims to add `SPARK_EXECUTOR_ATTRIBUTE_(APP|EXECUTOR)_ID` if `CUSTOM_EXECUTOR_LOG_URL` is defined. ### Why are the changes needed? Apache Spark has been supported `spark.ui.custom.executor.log.url` in K8s environment well. - #47681 This PR aims to help users use it more easily by providing the required `SPARK_EXECUTOR_ATTRIBUTE_APP_ID` and `SPARK_EXECUTOR_ATTRIBUTE_EXECUTOR_ID` automatically if `spark.ui.custom.executor.log.url` is defined. ### Does this PR introduce _any_ user-facing change? - No by default because `spark.ui.custom.executor.log.url` is not used. - When `spark.ui.custom.executor.log.url` is used, - For YARN users, there is no change for YARN users because this is K8s only change. - For K8s users, this will reduce the existing steps by providing the same environment variables. In addition, the user variables always overwrite this built-in environment variables by design. ### How was this patch tested? No. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47692 from dongjoon-hyun/SPARK-49190. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- docs/running-on-kubernetes.md | 2 -- .../main/scala/org/apache/spark/deploy/k8s/Constants.scala | 2 ++ .../spark/deploy/k8s/features/BasicExecutorFeatureStep.scala | 9 +++++++++ .../deploy/k8s/features/BasicExecutorFeatureStepSuite.scala | 12 ++++++++++++ 4 files changed, 23 insertions(+), 2 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 3fb963ea1eef..043c08bee064 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -433,8 +433,6 @@ the cluster. When there exists a log collection system, you can expose it at Spark Driver `Executors` tab UI. For example, ``` -spark.executorEnv.SPARK_EXECUTOR_ATTRIBUTE_APP_ID='$(SPARK_APPLICATION_ID)' -spark.executorEnv.SPARK_EXECUTOR_ATTRIBUTE_EXECUTOR_ID='$(SPARK_EXECUTOR_ID)' spark.ui.custom.executor.log.url='https://log-server/log?appId={{APP_ID}}&execId={{EXECUTOR_ID}}' ``` diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index ead3188aa649..aee07c096fe5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -72,6 +72,8 @@ object Constants { val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID" val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP" val ENV_EXECUTOR_POD_NAME = "SPARK_EXECUTOR_POD_NAME" + val ENV_EXECUTOR_ATTRIBUTE_APP_ID = "SPARK_EXECUTOR_ATTRIBUTE_APP_ID" + val ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID = "SPARK_EXECUTOR_ATTRIBUTE_EXECUTOR_ID" val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_" val ENV_CLASSPATH = "SPARK_CLASSPATH" val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index fa4a6f43215c..20050de69f89 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -143,6 +143,14 @@ private[spark] class BasicExecutorFeatureStep( (s"$ENV_JAVA_OPT_PREFIX$index", opt) }.toMap + val attributes = if (kubernetesConf.get(UI.CUSTOM_EXECUTOR_LOG_URL).isDefined) { + Map( + ENV_EXECUTOR_ATTRIBUTE_APP_ID -> kubernetesConf.appId, + ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID -> kubernetesConf.executorId) + } else { + Map.empty[String, String] + } + KubernetesUtils.buildEnvVars( Seq( ENV_DRIVER_URL -> driverUrl, @@ -153,6 +161,7 @@ private[spark] class BasicExecutorFeatureStep( ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL, ENV_EXECUTOR_ID -> kubernetesConf.executorId, ENV_RESOURCE_PROFILE_ID -> resourceProfile.id.toString) + ++ attributes ++ kubernetesConf.environment ++ sparkAuthSecret ++ Seq(ENV_CLASSPATH -> kubernetesConf.get(EXECUTOR_CLASS_PATH).orNull) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index eaf39dd816dc..3ed6d50f689d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -252,6 +252,18 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { s"/p1/${KubernetesTestConf.APP_ID}/1,/p2/${KubernetesTestConf.APP_ID}/1")) } + test("SPARK-49190: Add SPARK_EXECUTOR_ATTRIBUTE_(APP|EXECUTOR)_ID if CUSTOM_EXECUTOR_LOG_URL" + + " is defined") { + val conf = baseConf.clone() + .set(UI.CUSTOM_EXECUTOR_LOG_URL, "https://custom-executor-log-server/") + val kconf = KubernetesTestConf.createExecutorConf(sparkConf = conf) + val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(conf), defaultProfile) + val executor = step.configurePod(SparkPod.initialPod()) + checkEnv(executor, conf, Map( + ENV_EXECUTOR_ATTRIBUTE_APP_ID -> KubernetesTestConf.APP_ID, + ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID -> KubernetesTestConf.EXECUTOR_ID)) + } + test("test executor pyspark memory") { baseConf.set("spark.kubernetes.resource.type", "python") baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org