This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5d0556bae2c3 [SPARK-52902][K8S] Support `SPARK_VERSION` placeholder in container image names 5d0556bae2c3 is described below commit 5d0556bae2c337a27573d35e4aaed8111fc37f38 Author: Cheng Pan <cheng...@apache.org> AuthorDate: Mon Jul 21 08:54:00 2025 -0700 [SPARK-52902][K8S] Support `SPARK_VERSION` placeholder in container image names ### What changes were proposed in this pull request? This PR allows users to use `{{SPARK_VERSION}}` in the following three configs ``` spark.kubernetes.container.image spark.kubernetes.driver.container.image spark.kubernetes.executor.container.image ``` ### Why are the changes needed? Simplify Spark on K8s configuration. As an administrator of the Spark platform in our corp, I need to refresh the `spark-defaults.conf` on each version upgrade, for example, from ``` spark.kubernetes.container.image=foo.com/spark:4.0.0-1-corp ``` to ``` spark.kubernetes.container.image=foo.com/spark:4.0.0-2-corp ``` If I miss touch the `spark-defauls.conf`, then it may cause Spark Client and Driver to use different jars in client mode, thus causing potential issues. After this patch, I can write it using a pattern like ``` spark.kubernetes.container.image=foo.com/spark:{{SPARK_VERSION}} ``` ### Does this PR introduce _any_ user-facing change? This is a new feature. ### How was this patch tested? UT is added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51592 from pan3793/SPARK-52902. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- core/src/main/scala/org/apache/spark/util/Utils.scala | 9 ++++++++- docs/running-on-kubernetes.md | 9 ++++++--- .../main/scala/org/apache/spark/deploy/k8s/Config.scala | 9 ++++++--- .../org/apache/spark/deploy/k8s/KubernetesConf.scala | 15 ++++++++++++++- .../deploy/k8s/features/BasicDriverFeatureStep.scala | 5 +---- .../deploy/k8s/features/BasicExecutorFeatureStep.scala | 4 +--- .../org/apache/spark/deploy/k8s/KubernetesConfSuite.scala | 10 ++++++++++ 7 files changed, 46 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ad9d30da2b4a..f30a7bba9e88 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -66,7 +66,7 @@ import org.apache.logging.log4j.core.config.LoggerConfig import org.eclipse.jetty.util.MultiException import org.slf4j.Logger -import org.apache.spark._ +import org.apache.spark.{SPARK_VERSION, _} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC, MessageWithContext} import org.apache.spark.internal.LogKeys @@ -2909,6 +2909,13 @@ private[spark] object Utils opt.replace("{{APP_ID}}", appId) } + /** + * Replaces all the {{SPARK_VERSION}} occurrences with the Spark version. + */ + def substituteSparkVersion(opt: String): String = { + opt.replace("{{SPARK_VERSION}}", SPARK_VERSION) + } + def createSecret(conf: SparkConf): String = { val bits = conf.get(AUTH_SECRET_BIT_LENGTH) val rnd = new SecureRandom() diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 8ad9078a6a9a..e2bf48e06278 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -643,7 +643,8 @@ See the [configuration page](configuration.html) for information on Spark config Container image to use for the Spark application. This is usually of the form <code>example.com/repo/spark:v1.0.0</code>. This configuration is required and must be provided by the user, unless explicit - images are provided for each different container type. + images are provided for each different container type. Note that <code>{{SPARK_VERSION}}</code> + is the built-in variable that will be substituted with current Spark's version. </td> <td>2.3.0</td> </tr> @@ -651,7 +652,8 @@ See the [configuration page](configuration.html) for information on Spark config <td><code>spark.kubernetes.driver.container.image</code></td> <td><code>(value of spark.kubernetes.container.image)</code></td> <td> - Custom container image to use for the driver. + Custom container image to use for the driver. Note that <code>{{SPARK_VERSION}}</code> + is the built-in variable that will be substituted with current Spark's version. </td> <td>2.3.0</td> </tr> @@ -659,7 +661,8 @@ See the [configuration page](configuration.html) for information on Spark config <td><code>spark.kubernetes.executor.container.image</code></td> <td><code>(value of spark.kubernetes.container.image)</code></td> <td> - Custom container image to use for executors. + Custom container image to use for executors. Note that <code>{{SPARK_VERSION}}</code> + is the built-in variable that will be substituted with current Spark's version. </td> <td>2.3.0</td> </tr> diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 37edc81991ac..5ef03b9eba4e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -131,20 +131,23 @@ private[spark] object Config extends Logging { ConfigBuilder("spark.kubernetes.container.image") .doc("Container image to use for Spark containers. Individual container types " + "(e.g. driver or executor) can also be configured to use different images if desired, " + - "by setting the container type-specific image name.") + "by setting the container type-specific image name. Note that `{{SPARK_VERSION}}` is " + + "the built-in variable that will be substituted with current Spark's version.") .version("2.3.0") .stringConf .createOptional val DRIVER_CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.driver.container.image") - .doc("Container image to use for the driver.") + .doc("Container image to use for the driver. Note that `{{SPARK_VERSION}}` is " + + "the built-in variable that will be substituted with current Spark's version.") .version("2.3.0") .fallbackConf(CONTAINER_IMAGE) val EXECUTOR_CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.executor.container.image") - .doc("Container image to use for the executors.") + .doc("Container image to use for the executors. Note that `{{SPARK_VERSION}}` is " + + "the built-in variable that will be substituted with current Spark's version.") .version("2.3.0") .fallbackConf(CONTAINER_IMAGE) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index deb178eb90e1..9fe0697fa0d5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -21,7 +21,7 @@ import java.util.{Locale, UUID} import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod} import org.apache.commons.lang3.StringUtils -import org.apache.spark.{SPARK_VERSION, SparkConf} +import org.apache.spark.{SPARK_VERSION, SparkConf, SparkException} import org.apache.spark.annotation.{DeveloperApi, Since, Unstable} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ @@ -46,6 +46,7 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) { def volumes: Seq[KubernetesVolumeSpec] def schedulerName: Option[String] def appId: String + def image: String def appName: String = get("spark.app.name", "spark") @@ -173,6 +174,12 @@ class KubernetesDriverConf( override def schedulerName: Option[String] = { Option(get(KUBERNETES_DRIVER_SCHEDULER_NAME).getOrElse(get(KUBERNETES_SCHEDULER_NAME).orNull)) } + + override def image: String = { + get(DRIVER_CONTAINER_IMAGE).map(Utils.substituteSparkVersion).getOrElse { + throw new SparkException("Must specify the driver container image") + } + } } private[spark] class KubernetesExecutorConf( @@ -237,6 +244,12 @@ private[spark] class KubernetesExecutorConf( Option(get(KUBERNETES_EXECUTOR_SCHEDULER_NAME).getOrElse(get(KUBERNETES_SCHEDULER_NAME).orNull)) } + override def image: String = { + get(EXECUTOR_CONTAINER_IMAGE).map(Utils.substituteSparkVersion).getOrElse { + throw new SparkException("Must specify the executor container image") + } + } + private def checkExecutorEnvKey(key: String): Boolean = { // Pattern for matching an executorEnv key, which meets certain naming rules. val executorEnvRegex = "[-._a-zA-Z][-._a-zA-Z0-9]*".r diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 598034544477..eaf39914ea02 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -21,7 +21,6 @@ import scala.jdk.CollectionConverters._ import io.fabric8.kubernetes.api.model._ -import org.apache.spark.SparkException import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ @@ -37,9 +36,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) .get(KUBERNETES_DRIVER_POD_NAME) .getOrElse(s"${conf.resourceNamePrefix}-driver") - private val driverContainerImage = conf - .get(DRIVER_CONTAINER_IMAGE) - .getOrElse(throw new SparkException("Must specify the driver container image")) + private val driverContainerImage = conf.image // CPU settings private val driverCpuCores = conf.get(DRIVER_CORES) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 20050de69f89..fcbcd797a806 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -39,9 +39,7 @@ private[spark] class BasicExecutorFeatureStep( extends KubernetesFeatureConfigStep with Logging { // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf - private val executorContainerImage = kubernetesConf - .get(EXECUTOR_CONTAINER_IMAGE) - .getOrElse(throw new SparkException("Must specify the executor container image")) + private val executorContainerImage = kubernetesConf.image private val blockManagerPort = kubernetesConf .sparkConf .getInt(BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index 3c53e9b74f92..61469f8acb7a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -276,4 +276,14 @@ class KubernetesConfSuite extends SparkFunSuite { assert(KubernetesConf.getResourceNamePrefix(appName).matches("[a-z]([-a-z0-9]*[a-z0-9])?")) } } + + test("SPARK-52902: K8s image configs support {{SPARK_VERSION}} placeholder") { + val sparkConf = new SparkConf(false) + sparkConf.set(CONTAINER_IMAGE, "apache/spark:{{SPARK_VERSION}}") + sparkConf.set(EXECUTOR_CONTAINER_IMAGE, Some("foo.com/spark:{{SPARK_VERSION}}-corp")) + val driverUnsetConf = KubernetesTestConf.createDriverConf(sparkConf) + val execUnsetConf = KubernetesTestConf.createExecutorConf(sparkConf) + assert(driverUnsetConf.image === s"apache/spark:$SPARK_VERSION") + assert(execUnsetConf.image === s"foo.com/spark:$SPARK_VERSION-corp") + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org