This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0c61ccb8c264 [SPARK-55484][K8S] Simplify
`KubernetesClusterSchedulerBackend` by reducing private class variables
0c61ccb8c264 is described below
commit 0c61ccb8c26464f5f0a5afb5aa1757d2d8a07e56
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Feb 11 10:20:18 2026 -0800
[SPARK-55484][K8S] Simplify `KubernetesClusterSchedulerBackend` by reducing
private class variables
### What changes were proposed in this pull request?
This PR aims to simplify `KubernetesClusterSchedulerBackend` by reducing
private class variables.
### Why are the changes needed?
The following private class variables are used only once.
```scala
private val appId = KubernetesConf.getKubernetesAppId()
private val shouldDeleteDriverService =
conf.get(KUBERNETES_DRIVER_SERVICE_DELETE_ON_TERMINATION)
private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)
private val defaultProfile =
scheduler.sc.resourceProfileManager.defaultResourceProfile
```
Instead, the following new private variable is added to avoid
re-computation.
```scala
private val minRegisteredExecutors = initialExecutors * minRegisteredRatio
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54269 from dongjoon-hyun/SPARK-55484.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../cluster/k8s/KubernetesClusterSchedulerBackend.scala | 16 ++++++----------
1 file changed, 6 insertions(+), 10 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 381cd0f788c4..fa5e959276f5 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -55,7 +55,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
watchEvents: ExecutorPodsWatchSnapshotSource,
pollEvents: ExecutorPodsPollingSnapshotSource)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
- private val appId = KubernetesConf.getKubernetesAppId()
protected override val minRegisteredRatio =
if (conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).isEmpty) {
@@ -66,11 +65,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val initialExecutors =
SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
- private val shouldDeleteDriverService =
conf.get(KUBERNETES_DRIVER_SERVICE_DELETE_ON_TERMINATION)
-
- private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)
-
- private val defaultProfile =
scheduler.sc.resourceProfileManager.defaultResourceProfile
+ private val minRegisteredExecutors = initialExecutors * minRegisteredRatio
private val namespace = conf.get(KUBERNETES_NAMESPACE)
@@ -103,13 +98,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
* @return The application ID
*/
override def applicationId(): String = {
- conf.getOption("spark.app.id").getOrElse(appId)
+
conf.getOption("spark.app.id").getOrElse(KubernetesConf.getKubernetesAppId())
}
override def start(): Unit = {
super.start()
// Must be called before setting the executors
podAllocator.start(applicationId(), this)
+ val defaultProfile =
scheduler.sc.resourceProfileManager.defaultResourceProfile
val initExecs = Map(defaultProfile -> initialExecutors)
podAllocator.setTotalExpectedExecutors(initExecs)
lifecycleManager.start(this)
@@ -139,7 +135,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
pollEvents.stop()
}
- if (shouldDeleteDriverService) {
+ if (conf.get(KUBERNETES_DRIVER_SERVICE_DELETE_ON_TERMINATION)) {
Utils.tryLogNonFatalError {
kubernetesClient
.services()
@@ -159,7 +155,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
}
- if (shouldDeleteExecutors) {
+ if (conf.get(KUBERNETES_DELETE_EXECUTORS)) {
podAllocator.stop(applicationId())
@@ -191,7 +187,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
override def sufficientResourcesRegistered(): Boolean = {
- totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio
+ totalRegisteredExecutors.get() >= minRegisteredExecutors
}
override def getExecutorIds(): Seq[String] = synchronized {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]