This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cae4e2a20915 [SPARK-55370][K8S] Improve `annotateExecutorDeletionCost` 
to use `patch` instead of `edit` API
cae4e2a20915 is described below

commit cae4e2a20915ec6eed283b1200ea5bc5f28b0d7d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Feb 5 11:02:33 2026 -0800

    [SPARK-55370][K8S] Improve `annotateExecutorDeletionCost` to use `patch` 
instead of `edit` API
    
    ### What changes were proposed in this pull request?
    
    This PR aims to improve `annotateExecutorDeletionCost` to use `patch` 
instead of `edit` API.
    
    ### Why are the changes needed?
    
    **Network Efficiency**
    
    - `edit` requires fetching the entire resource and sending the full updated 
resource back.
    - `patch` only transmits the specific changes, making it much more 
network-efficient.
    
    **Concurrency & Conflict Resolution**
    
    - `edit` typically follows a Get -> Modify -> Update (PUT) pattern. Using 
this pattern creates a race condition where, if another client modifies the 
resource in between, a 409 Conflict error occurs due to a mismatched 
resourceVersion.
    - `patch` sends only the changes (delta) to the server, where the merge is 
handled server-side. This significantly reduces the risk of conflicts, 
especially for simple operations like adding an annotation.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No behavior change because the annotation feature is not released yet.
    
    ### How was this patch tested?
    
    Pass the CIs with newly updated test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: `Gemini 3 Pro (High)` on `Antigravity`
    
    Closes #54158 from dongjoon-hyun/SPARK-55370.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../cluster/k8s/KubernetesClusterSchedulerBackend.scala     | 13 ++++++++-----
 .../k8s/KubernetesClusterSchedulerBackendSuite.scala        | 13 ++++---------
 2 files changed, 12 insertions(+), 14 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 6003cabd56a5..b6d16a56b4f5 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -25,6 +25,7 @@ import scala.concurrent.Future
 import io.fabric8.kubernetes.api.model.Pod
 import io.fabric8.kubernetes.api.model.PodBuilder
 import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.base.{PatchContext, PatchType}
 
 import org.apache.spark.SparkContext
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils}
@@ -73,6 +74,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
 
   private val namespace = conf.get(KUBERNETES_NAMESPACE)
 
+  private val PATCH_CONTEXT = PatchContext.of(PatchType.STRATEGIC_MERGE)
+
   // KEP 2255: When a Deployment or Replicaset is scaled down, the pods will 
be deleted in the
   // order of the value of this annotation, ascending.
   private val podDeletionCostAnnotation = 
"controller.kubernetes.io/pod-deletion-cost"
@@ -212,11 +215,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
             .withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*)
             .resources()
             .forEach { podResource =>
-              podResource.edit({ p: Pod =>
-                new PodBuilder(p).editOrNewMetadata()
-                  .addToAnnotations(podDeletionCostAnnotation, cost.toString)
-                  .endMetadata()
-                  .build()})
+              podResource.patch(PATCH_CONTEXT, new PodBuilder()
+                .withNewMetadata()
+                .addToAnnotations(podDeletionCostAnnotation, cost.toString)
+                .endMetadata()
+                .build())
             }
         }
       }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 8b1f2c3e2287..85358c5f1bfb 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -252,11 +252,8 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
         .endMetadata()
       .build()
 
-    val editCaptor = ArgumentCaptor.forClass(classOf[UnaryOperator[Pod]])
-    when(podResource.edit(any(classOf[UnaryOperator[Pod]]))).thenAnswer { 
invocation =>
-      val fn = invocation.getArgument[UnaryOperator[Pod]](0)
-      fn.apply(basePod)
-    }
+    val patchCaptor = ArgumentCaptor.forClass(classOf[Pod])
+    when(podResource.patch(any(), any(classOf[Pod]))).thenReturn(basePod)
 
     when(labeledPods.resources())
       .thenAnswer(_ => java.util.stream.Stream.of[PodResource](podResource))
@@ -269,10 +266,8 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
     method.invoke(schedulerBackendUnderTest, Seq("3"))
     schedulerExecutorService.runUntilIdle()
 
-    verify(podResource, atLeastOnce()).edit(editCaptor.capture())
-    val appliedPods = editCaptor.getAllValues.asScala
-      .scanLeft(basePod)((pod, fn) => fn.apply(pod))
-      .tail
+    verify(podResource, atLeastOnce()).patch(any(), patchCaptor.capture())
+    val appliedPods = patchCaptor.getAllValues.asScala
     val annotated = appliedPods
       .find(_.getMetadata.getAnnotations.asScala
         .contains("controller.kubernetes.io/pod-deletion-cost"))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to