This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 10ee643c674 [SPARK-43504][K8S] Mounts the hadoop config map on the
executor pod
10ee643c674 is described below
commit 10ee643c6746bbf110be6d680fe1c3559e522720
Author: fwang12 <[email protected]>
AuthorDate: Wed May 31 21:30:05 2023 -0700
[SPARK-43504][K8S] Mounts the hadoop config map on the executor pod
### What changes were proposed in this pull request?
In this pr, for spark on k8s, the hadoop config map will be mounted in
executor side as well.
Before, the hadoop config map is only mounted in driver side.
### Why are the changes needed?
Since [SPARK-25815](https://issues.apache.org/jira/browse/SPARK-25815)
[,](https://github.com/apache/spark/pull/22911,) the hadoop config map will not
be mounted in executor side.
Per the https://github.com/apache/spark/pull/22911 description:
> The main two things that don't need to happen in executors anymore are:
> 1. adding the Hadoop config to the executor pods: this is not needed
> since the Spark driver will serialize the Hadoop config and send
> it to executors when running tasks.
But in fact, the executor still need the hadoop configuration.

As shown in above picture, the driver can resolve `hdfs://zeus`, but the
executor can not.
so we still need to mount the hadoop config map in executor side.
### Does this PR introduce _any_ user-facing change?
Yes, users do not need to take workarounds to make executors load the
hadoop configuration.
Such as:
- including hadoop conf in executor image
- placing hadoop conf files under `SPARK_CONF_DIR`.
### How was this patch tested?
UT.
Closes #41181 from turboFei/exec_hadoop_conf.
Authored-by: fwang12 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../k8s/features/HadoopConfDriverFeatureStep.scala | 8 +++
.../features/HadoopConfExecutorFeatureStep.scala | 63 +++++++++++++++++
.../cluster/k8s/KubernetesExecutorBuilder.scala | 1 +
.../HadoopConfExecutorFeatureStepSuite.scala | 82 ++++++++++++++++++++++
4 files changed, 154 insertions(+)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala
index 069a57d3dc4..45a5b8d7dae 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala
@@ -104,6 +104,14 @@ private[spark] class HadoopConfDriverFeatureStep(conf:
KubernetesConf)
}
}
+ override def getAdditionalPodSystemProperties(): Map[String, String] = {
+ if (hasHadoopConf) {
+ Map(HADOOP_CONFIG_MAP_NAME ->
existingConfMap.getOrElse(newConfigMapName))
+ } else {
+ Map.empty
+ }
+ }
+
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
if (confDir.isDefined) {
val fileMap = confFiles.map { file =>
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
new file mode 100644
index 00000000000..8a2773c1ac3
--- /dev/null
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder,
VolumeBuilder}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+
+/**
+ * Mounts the Hadoop configuration on the executor pod.
+ */
+private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesConf)
+ extends KubernetesFeatureConfigStep {
+
+ private val hadoopConfigMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME)
+
+ override def configurePod(original: SparkPod): SparkPod = {
+ original.transform { case pod if hadoopConfigMapName.isDefined =>
+ val confVolume = new VolumeBuilder()
+ .withName(HADOOP_CONF_VOLUME)
+ .withNewConfigMap()
+ .withName(hadoopConfigMapName.get)
+ .endConfigMap()
+ .build()
+
+ val podWithConf = new PodBuilder(pod.pod)
+ .editSpec()
+ .addNewVolumeLike(confVolume)
+ .endVolume()
+ .endSpec()
+ .build()
+
+ val containerWithMount = new ContainerBuilder(pod.container)
+ .addNewVolumeMount()
+ .withName(HADOOP_CONF_VOLUME)
+ .withMountPath(HADOOP_CONF_DIR_PATH)
+ .endVolumeMount()
+ .addNewEnv()
+ .withName(ENV_HADOOP_CONF_DIR)
+ .withValue(HADOOP_CONF_DIR_PATH)
+ .endEnv()
+ .build()
+
+ SparkPod(podWithConf, containerWithMount)
+ }
+ }
+}
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 67aad00f985..a85e42662b8 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -71,6 +71,7 @@ private[spark] class KubernetesExecutorBuilder {
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
+ new HadoopConfExecutorFeatureStep(conf),
new LocalDirsFeatureStep(conf)) ++ userFeatures
val spec = KubernetesExecutorSpec(
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala
new file mode 100644
index 00000000000..a60227814eb
--- /dev/null
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+
+import com.google.common.io.Files
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{Constants, KubernetesTestConf,
SecretVolumeUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+import
org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.containerHasEnvVar
+import org.apache.spark.util.{SparkConfWithEnv, Utils}
+
+class HadoopConfExecutorFeatureStepSuite extends SparkFunSuite {
+ import SecretVolumeUtils._
+
+ test("SPARK-43504: mounts the hadoop config map on the executor pod") {
+ val confDir = Utils.createTempDir()
+ val confFiles = Set("core-site.xml", "hdfs-site.xml")
+
+ confFiles.foreach { f =>
+ Files.write("some data", new File(confDir, f), UTF_8)
+ }
+
+ Seq(
+ Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath()),
+ Map.empty[String, String]).foreach { env =>
+ val hasHadoopConf = env.contains(ENV_HADOOP_CONF_DIR)
+
+ val driverSparkConf = new SparkConfWithEnv(env)
+ val executorSparkConf = new SparkConf(false)
+
+ val driverConf = KubernetesTestConf.createDriverConf(sparkConf =
driverSparkConf)
+ val driverStep = new HadoopConfDriverFeatureStep(driverConf)
+
+ val additionalPodSystemProperties =
driverStep.getAdditionalPodSystemProperties()
+ if (hasHadoopConf) {
+
assert(additionalPodSystemProperties.contains(Constants.HADOOP_CONFIG_MAP_NAME))
+ additionalPodSystemProperties.foreach { case (key, value) =>
+ executorSparkConf.set(key, value)
+ }
+ } else {
+ assert(additionalPodSystemProperties.isEmpty)
+ }
+
+ val executorConf = KubernetesTestConf.createExecutorConf(sparkConf =
executorSparkConf)
+ val executorStep = new HadoopConfExecutorFeatureStep(executorConf)
+ val executorPod = executorStep.configurePod(SparkPod.initialPod())
+
+ checkPod(executorPod, hasHadoopConf)
+ }
+ }
+
+ private def checkPod(pod: SparkPod, hasHadoopConf: Boolean): Unit = {
+ if (hasHadoopConf) {
+ assert(podHasVolume(pod.pod, HADOOP_CONF_VOLUME))
+ assert(containerHasVolume(pod.container, HADOOP_CONF_VOLUME,
HADOOP_CONF_DIR_PATH))
+ assert(containerHasEnvVar(pod.container, ENV_HADOOP_CONF_DIR))
+ } else {
+ assert(!podHasVolume(pod.pod, HADOOP_CONF_VOLUME))
+ assert(!containerHasVolume(pod.container, HADOOP_CONF_VOLUME,
HADOOP_CONF_DIR_PATH))
+ assert(!containerHasEnvVar(pod.container, ENV_HADOOP_CONF_DIR))
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]