This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b976e7b9e058 [SPARK-55603][K8S] Improve `removeExecutorFromK8s` to use 
`patch` instead of `edit` API
b976e7b9e058 is described below

commit b976e7b9e058f9e36309008e4d9b14fc085f9773
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Feb 18 18:59:49 2026 -0800

    [SPARK-55603][K8S] Improve `removeExecutorFromK8s` to use `patch` instead 
of `edit` API
    
    ### What changes were proposed in this pull request?
    
    This PR aims to improve `removeExecutorFromK8s` to use `patch` instead of 
`edit` API.
    
    ### Why are the changes needed?
    
    **Network Efficiency**
    
    - `edit` requires fetching the entire resource and sending the full updated 
resource back.
    - `patch` only transmits the specific changes, making it much more 
network-efficient.
    
    **Concurrency & Conflict Resolution**
    
    - `edit` typically follows a Get -> Modify -> Update (PUT) pattern. Using 
this pattern creates a race condition where, if another client modifies the 
resource in between, a 409 Conflict error occurs due to a mismatched 
resourceVersion.
    - `patch` sends only the changes (delta) to the server, where the merge is 
handled server-side. This significantly reduces the risk of conflicts, 
especially for simple operations like adding an annotation.
    
    ### Does this PR introduce _any_ user-facing change?
    
    This will reduce the overhead of K8s control plane and the chance of 409 
error.
    
    ### How was this patch tested?
    
    Pass the CIs with newly updated test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: `Gemini 3 Pro (High)` on `Antigravity`
    
    Closes #54376 from dongjoon-hyun/SPARK-55603.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../cluster/k8s/ExecutorPodsLifecycleManager.scala       | 16 ++++++++--------
 .../cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala  | 16 ++++++----------
 2 files changed, 14 insertions(+), 18 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index 3a508add6ccf..c57a014dcfa6 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -17,7 +17,6 @@
 package org.apache.spark.scheduler.cluster.k8s
 
 import java.util.concurrent.TimeUnit
-import java.util.function.UnaryOperator
 
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
@@ -25,6 +24,7 @@ import scala.jdk.CollectionConverters._
 import com.google.common.cache.CacheBuilder
 import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
 import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.base.{PatchContext, PatchType}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.ExecutorFailureTracker
@@ -64,6 +64,8 @@ private[spark] class ExecutorPodsLifecycleManager(
 
   private val namespace = conf.get(KUBERNETES_NAMESPACE)
 
+  private val PATCH_CONTEXT = PatchContext.of(PatchType.STRATEGIC_MERGE)
+
   private val sparkContainerName = 
conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME)
     .getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME)
 
@@ -231,7 +233,11 @@ private[spark] class ExecutorPodsLifecycleManager(
           .pods()
           .inNamespace(namespace)
           .withName(updatedPod.getMetadata.getName)
-          .edit(executorInactivationFn)
+          .patch(PATCH_CONTEXT, new PodBuilder()
+            .editOrNewMetadata()
+              .addToLabels(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
+            .endMetadata()
+            .build())
 
         inactivatedPods += execId
       }
@@ -321,10 +327,4 @@ private object ExecutorPodsLifecycleManager {
     }
     s"${code}${humanStr}"
   }
-
-  def executorInactivationFn: UnaryOperator[Pod] = (p: Pod) => new 
PodBuilder(p)
-    .editOrNewMetadata()
-    .addToLabels(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
-    .endMetadata()
-    .build()
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index cdbcae050ceb..a0398f4f9514 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -16,14 +16,13 @@
  */
 package org.apache.spark.scheduler.cluster.k8s
 
-import java.util.function.UnaryOperator
-
 import scala.collection.mutable
 
 import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
 import io.fabric8.kubernetes.client.KubernetesClient
 import io.fabric8.kubernetes.client.dsl.PodResource
-import org.mockito.{Mock, MockitoAnnotations}
+import io.fabric8.kubernetes.client.dsl.base.PatchContext
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
 import org.mockito.ArgumentMatchers.any
 import org.mockito.ArgumentMatchers.anyString
 import org.mockito.Mockito.{mock, never, times, verify, when}
@@ -191,12 +190,6 @@ class ExecutorPodsLifecycleManagerSuite extends 
SparkFunSuite with BeforeAndAfte
     verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
   }
 
-  test("SPARK-40458: test executor inactivation function") {
-    val failedPod = failedExecutorWithoutDeletion(1)
-    val inactivated = 
ExecutorPodsLifecycleManager.executorInactivationFn(failedPod)
-    
assert(inactivated.getMetadata().getLabels().get(SPARK_EXECUTOR_INACTIVE_LABEL) 
=== "true")
-  }
-
   test("Keep executor pods in k8s if configured.") {
     val failedPod = failedExecutorWithoutDeletion(1)
     eventHandlerUnderTest.conf.set(Config.KUBERNETES_DELETE_EXECUTORS, false)
@@ -206,8 +199,11 @@ class ExecutorPodsLifecycleManagerSuite extends 
SparkFunSuite with BeforeAndAfte
     val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
     verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
     verify(namedExecutorPods(failedPod.getMetadata.getName), never()).delete()
+
+    val patchCaptor = ArgumentCaptor.forClass(classOf[Pod])
     verify(namedExecutorPods(failedPod.getMetadata.getName))
-      .edit(any[UnaryOperator[Pod]]())
+      .patch(any[PatchContext], patchCaptor.capture())
+    
assert(patchCaptor.getValue.getMetadata.getLabels.get(SPARK_EXECUTOR_INACTIVE_LABEL)
 === "true")
   }
 
   test("SPARK-49804: Use the exit code of executor container always") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to