This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cae4e2a20915 [SPARK-55370][K8S] Improve `annotateExecutorDeletionCost`
to use `patch` instead of `edit` API
cae4e2a20915 is described below
commit cae4e2a20915ec6eed283b1200ea5bc5f28b0d7d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Feb 5 11:02:33 2026 -0800
[SPARK-55370][K8S] Improve `annotateExecutorDeletionCost` to use `patch`
instead of `edit` API
### What changes were proposed in this pull request?
This PR aims to improve `annotateExecutorDeletionCost` to use `patch`
instead of `edit` API.
### Why are the changes needed?
**Network Efficiency**
- `edit` requires fetching the entire resource and sending the full updated
resource back.
- `patch` only transmits the specific changes, making it much more
network-efficient.
**Concurrency & Conflict Resolution**
- `edit` typically follows a Get -> Modify -> Update (PUT) pattern. Using
this pattern creates a race condition where, if another client modifies the
resource in between, a 409 Conflict error occurs due to a mismatched
resourceVersion.
- `patch` sends only the changes (delta) to the server, where the merge is
handled server-side. This significantly reduces the risk of conflicts,
especially for simple operations like adding an annotation.
### Does this PR introduce _any_ user-facing change?
No behavior change because the annotation feature is not released yet.
### How was this patch tested?
Pass the CIs with newly updated test case.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: `Gemini 3 Pro (High)` on `Antigravity`
Closes #54158 from dongjoon-hyun/SPARK-55370.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../cluster/k8s/KubernetesClusterSchedulerBackend.scala | 13 ++++++++-----
.../k8s/KubernetesClusterSchedulerBackendSuite.scala | 13 ++++---------
2 files changed, 12 insertions(+), 14 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 6003cabd56a5..b6d16a56b4f5 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -25,6 +25,7 @@ import scala.concurrent.Future
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.api.model.PodBuilder
import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.base.{PatchContext, PatchType}
import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils}
@@ -73,6 +74,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val namespace = conf.get(KUBERNETES_NAMESPACE)
+ private val PATCH_CONTEXT = PatchContext.of(PatchType.STRATEGIC_MERGE)
+
// KEP 2255: When a Deployment or Replicaset is scaled down, the pods will
be deleted in the
// order of the value of this annotation, ascending.
private val podDeletionCostAnnotation =
"controller.kubernetes.io/pod-deletion-cost"
@@ -212,11 +215,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*)
.resources()
.forEach { podResource =>
- podResource.edit({ p: Pod =>
- new PodBuilder(p).editOrNewMetadata()
- .addToAnnotations(podDeletionCostAnnotation, cost.toString)
- .endMetadata()
- .build()})
+ podResource.patch(PATCH_CONTEXT, new PodBuilder()
+ .withNewMetadata()
+ .addToAnnotations(podDeletionCostAnnotation, cost.toString)
+ .endMetadata()
+ .build())
}
}
}
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 8b1f2c3e2287..85358c5f1bfb 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -252,11 +252,8 @@ class KubernetesClusterSchedulerBackendSuite extends
SparkFunSuite with BeforeAn
.endMetadata()
.build()
- val editCaptor = ArgumentCaptor.forClass(classOf[UnaryOperator[Pod]])
- when(podResource.edit(any(classOf[UnaryOperator[Pod]]))).thenAnswer {
invocation =>
- val fn = invocation.getArgument[UnaryOperator[Pod]](0)
- fn.apply(basePod)
- }
+ val patchCaptor = ArgumentCaptor.forClass(classOf[Pod])
+ when(podResource.patch(any(), any(classOf[Pod]))).thenReturn(basePod)
when(labeledPods.resources())
.thenAnswer(_ => java.util.stream.Stream.of[PodResource](podResource))
@@ -269,10 +266,8 @@ class KubernetesClusterSchedulerBackendSuite extends
SparkFunSuite with BeforeAn
method.invoke(schedulerBackendUnderTest, Seq("3"))
schedulerExecutorService.runUntilIdle()
- verify(podResource, atLeastOnce()).edit(editCaptor.capture())
- val appliedPods = editCaptor.getAllValues.asScala
- .scanLeft(basePod)((pod, fn) => fn.apply(pod))
- .tail
+ verify(podResource, atLeastOnce()).patch(any(), patchCaptor.capture())
+ val appliedPods = patchCaptor.getAllValues.asScala
val annotated = appliedPods
.find(_.getMetadata.getAnnotations.asScala
.contains("controller.kubernetes.io/pod-deletion-cost"))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]