This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 252f6a447d55 [SPARK-55410][K8S] Improve
`SparkKubernetesDiagnosticsSetter` to use `patch` instead of `edit` API
252f6a447d55 is described below
commit 252f6a447d55ad221d651bd3c23cdf2ec456c05f
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat Feb 7 21:00:41 2026 -0800
[SPARK-55410][K8S] Improve `SparkKubernetesDiagnosticsSetter` to use
`patch` instead of `edit` API
### What changes were proposed in this pull request?
This PR aims to improve `SparkKubernetesDiagnosticsSetter ` to use `patch`
instead of `edit` API
### Why are the changes needed?
**Network Efficiency**
- `edit` requires fetching the entire resource and sending the full updated
resource back.
- `patch` only transmits the specific changes, making it much more
network-efficient.
**Concurrency & Conflict Resolution**
- `edit` typically follows a Get -> Modify -> Update (PUT) pattern. Using
this pattern creates a race condition where, if another client modifies the
resource in between, a 409 Conflict error occurs due to a mismatched
resourceVersion.
- `patch` sends only the changes (delta) to the server, where the merge is
handled server-side. This significantly reduces the risk of conflicts,
especially for simple operations like adding an annotation.
### Does this PR introduce _any_ user-facing change?
This will reduce the overhead of K8s control plane and the chance of 409
error.
### How was this patch tested?
Pass the CIs with newly updated test case.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: `Opus 4.5` on `Claude Code`
Closes #54194 from dongjoon-hyun/SPARK-55410.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../deploy/k8s/SparkKubernetesDiagnosticsSetter.scala | 8 +++++---
.../deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala | 14 ++++----------
2 files changed, 9 insertions(+), 13 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala
index 24f42919ce05..824e949e2887 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala
@@ -16,8 +16,9 @@
*/
package org.apache.spark.deploy.k8s
-import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.api.model.PodBuilder
import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.base.{PatchContext, PatchType}
import org.apache.hadoop.util.StringUtils
import org.apache.spark.{SparkConf, SparkMasterRegex}
@@ -52,6 +53,7 @@ private[spark] class
SparkKubernetesDiagnosticsSetter(clientProvider: Kubernetes
extends SparkDiagnosticsSetter with Logging {
private val KUBERNETES_EXIT_EXCEPTION_MESSAGE_LIMIT_BYTES = 64 * 1024 // 64
KiB
+ private val PATCH_CONTEXT = PatchContext.of(PatchType.STRATEGIC_MERGE)
def this() = {
this(new DefaultKubernetesClientProvider)
@@ -65,8 +67,8 @@ private[spark] class
SparkKubernetesDiagnosticsSetter(clientProvider: Kubernetes
client.pods()
.inNamespace(conf.get(KUBERNETES_NAMESPACE))
.withName(podName)
- .edit((p: Pod) => new PodBuilder(p)
- .editOrNewMetadata()
+ .patch(PATCH_CONTEXT, new PodBuilder()
+ .withNewMetadata()
.addToAnnotations(EXIT_EXCEPTION_ANNOTATION, diagnostics)
.endMetadata()
.build());
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala
index 65c150b2a035..aa44eb4d9ea8 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala
@@ -16,11 +16,10 @@
*/
package org.apache.spark.deploy.k8s
-import java.util.function.UnaryOperator
-
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
+import io.fabric8.kubernetes.client.dsl.base.PatchContext
import org.apache.hadoop.util.StringUtils
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.any
@@ -75,15 +74,10 @@ class SparkKubernetesDiagnosticsSetterSuite extends
SparkFunSuite
setter.setDiagnostics(diagnostics, conf)
- val captor: ArgumentCaptor[UnaryOperator[Pod]] =
- ArgumentCaptor.forClass(classOf[UnaryOperator[Pod]])
- verify(driverPodOperations).edit(captor.capture())
-
- val fn = captor.getValue
- val initialPod = SparkPod.initialPod().pod
- val editedPod = fn.apply(initialPod)
+ val podCaptor: ArgumentCaptor[Pod] = ArgumentCaptor.forClass(classOf[Pod])
+ verify(driverPodOperations).patch(any(classOf[PatchContext]),
podCaptor.capture())
- assert(editedPod.getMetadata.getAnnotations.get(EXIT_EXCEPTION_ANNOTATION)
+
assert(podCaptor.getValue.getMetadata.getAnnotations.get(EXIT_EXCEPTION_ANNOTATION)
== StringUtils.stringifyException(diagnostics))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]