This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b976e7b9e058 [SPARK-55603][K8S] Improve `removeExecutorFromK8s` to use
`patch` instead of `edit` API
b976e7b9e058 is described below
commit b976e7b9e058f9e36309008e4d9b14fc085f9773
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Feb 18 18:59:49 2026 -0800
[SPARK-55603][K8S] Improve `removeExecutorFromK8s` to use `patch` instead
of `edit` API
### What changes were proposed in this pull request?
This PR aims to improve `removeExecutorFromK8s` to use `patch` instead of
`edit` API.
### Why are the changes needed?
**Network Efficiency**
- `edit` requires fetching the entire resource and sending the full updated
resource back.
- `patch` only transmits the specific changes, making it much more
network-efficient.
**Concurrency & Conflict Resolution**
- `edit` typically follows a Get -> Modify -> Update (PUT) pattern. Using
this pattern creates a race condition where, if another client modifies the
resource in between, a 409 Conflict error occurs due to a mismatched
resourceVersion.
- `patch` sends only the changes (delta) to the server, where the merge is
handled server-side. This significantly reduces the risk of conflicts,
especially for simple operations like adding an annotation.
### Does this PR introduce _any_ user-facing change?
This will reduce the overhead of K8s control plane and the chance of 409
error.
### How was this patch tested?
Pass the CIs with newly updated test case.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: `Gemini 3 Pro (High)` on `Antigravity`
Closes #54376 from dongjoon-hyun/SPARK-55603.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../cluster/k8s/ExecutorPodsLifecycleManager.scala | 16 ++++++++--------
.../cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala | 16 ++++++----------
2 files changed, 14 insertions(+), 18 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index 3a508add6ccf..c57a014dcfa6 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -17,7 +17,6 @@
package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.TimeUnit
-import java.util.function.UnaryOperator
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@@ -25,6 +24,7 @@ import scala.jdk.CollectionConverters._
import com.google.common.cache.CacheBuilder
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.base.{PatchContext, PatchType}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.ExecutorFailureTracker
@@ -64,6 +64,8 @@ private[spark] class ExecutorPodsLifecycleManager(
private val namespace = conf.get(KUBERNETES_NAMESPACE)
+ private val PATCH_CONTEXT = PatchContext.of(PatchType.STRATEGIC_MERGE)
+
private val sparkContainerName =
conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME)
.getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME)
@@ -231,7 +233,11 @@ private[spark] class ExecutorPodsLifecycleManager(
.pods()
.inNamespace(namespace)
.withName(updatedPod.getMetadata.getName)
- .edit(executorInactivationFn)
+ .patch(PATCH_CONTEXT, new PodBuilder()
+ .editOrNewMetadata()
+ .addToLabels(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
+ .endMetadata()
+ .build())
inactivatedPods += execId
}
@@ -321,10 +327,4 @@ private object ExecutorPodsLifecycleManager {
}
s"${code}${humanStr}"
}
-
- def executorInactivationFn: UnaryOperator[Pod] = (p: Pod) => new
PodBuilder(p)
- .editOrNewMetadata()
- .addToLabels(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
- .endMetadata()
- .build()
}
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index cdbcae050ceb..a0398f4f9514 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -16,14 +16,13 @@
*/
package org.apache.spark.scheduler.cluster.k8s
-import java.util.function.UnaryOperator
-
import scala.collection.mutable
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
-import org.mockito.{Mock, MockitoAnnotations}
+import io.fabric8.kubernetes.client.dsl.base.PatchContext
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.any
import org.mockito.ArgumentMatchers.anyString
import org.mockito.Mockito.{mock, never, times, verify, when}
@@ -191,12 +190,6 @@ class ExecutorPodsLifecycleManagerSuite extends
SparkFunSuite with BeforeAndAfte
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
}
- test("SPARK-40458: test executor inactivation function") {
- val failedPod = failedExecutorWithoutDeletion(1)
- val inactivated =
ExecutorPodsLifecycleManager.executorInactivationFn(failedPod)
-
assert(inactivated.getMetadata().getLabels().get(SPARK_EXECUTOR_INACTIVE_LABEL)
=== "true")
- }
-
test("Keep executor pods in k8s if configured.") {
val failedPod = failedExecutorWithoutDeletion(1)
eventHandlerUnderTest.conf.set(Config.KUBERNETES_DELETE_EXECUTORS, false)
@@ -206,8 +199,11 @@ class ExecutorPodsLifecycleManagerSuite extends
SparkFunSuite with BeforeAndAfte
val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
verify(namedExecutorPods(failedPod.getMetadata.getName), never()).delete()
+
+ val patchCaptor = ArgumentCaptor.forClass(classOf[Pod])
verify(namedExecutorPods(failedPod.getMetadata.getName))
- .edit(any[UnaryOperator[Pod]]())
+ .patch(any[PatchContext], patchCaptor.capture())
+
assert(patchCaptor.getValue.getMetadata.getLabels.get(SPARK_EXECUTOR_INACTIVE_LABEL)
=== "true")
}
test("SPARK-49804: Use the exit code of executor container always") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]