This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ae77ee8539d [SPARK-55496][K8S] Support re-use of scaled PVCs
9ae77ee8539d is described below

commit 9ae77ee8539dc83eeb97b58cb3745c6932aa53ef
Author: Filip Darmanovic <[email protected]>
AuthorDate: Mon Feb 16 10:07:01 2026 -0800

    [SPARK-55496][K8S] Support re-use of scaled PVCs
    
    ### What changes were proposed in this pull request?
    
    When attempting to re-use existing persistent volume claims, the method 
`replacePVCsIfNeeded` in `ExecutorPodsAllocator` now considers PVCs with a 
higher capacity than what was originally required.
    
    ### Why are the changes needed?
    
    Most cloud providers today provide kubernetes storageClasses with 
allowVolumeExpansion set to true. This, paired with a PVC monitoring and 
scaling service like the [Kubernetes Volume 
Autoscaler|https://github.com/DevOps-Nirvana/Kubernetes-Volume-Autoscaler] 
allows the user to provision PVCs that can expand with growing storage 
requirements, saving costs by not over-provisioning. This expansion is done by 
simply increasing the storage request in the PVC's definition.
    
    This approach however, presents a problem in the current code base, as 
there is a check in the replacePVCsIfNeeded function that matches the desired 
size with the existing pod definition, which would suddenly be ineligible for 
re-use after being expanded.
    I propose a simple 1-character change that would allow disks with the same 
or greater size to be re-used. I will shortly make a PR with this change.
    
    This PR covers the use case where a volume auto-scaler expands the volumes 
used by Spark. Without this change, those volumes are ignored, even though they 
very likely contain valuable shuffle data. In fact, the reason why the previous 
worker had crashed could very well be because the disk got full before being 
expanded.
    
    ### Does this PR introduce _any_ user-facing change?
    
    The default behavior in all versions of Spark so far was to ignore larger 
disks even if they are owned by the Spark driver. I do not believe my change 
can break anything, as I can't think of a scenario where somebody was relying 
on this behavior.
    
    ### How was this patch tested?
    
    A new unit test was added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54336 from Dzeri96/SPARK-55496-support-reuse-of-scaled-PVCs.
    
    Authored-by: Filip Darmanovic <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../cluster/k8s/ExecutorPodsAllocator.scala        |  7 +++---
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 25 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index bd1a65fa0255..126d07c0926f 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -496,11 +496,12 @@ class ExecutorPodsAllocator(
     val replacedResources = mutable.Set[HasMetadata]()
     resources.foreach {
       case pvc: PersistentVolumeClaim =>
-        // Find one with the same storage class and size.
+        // Find one with the same storage class and same or greater size.
+        // Larger disks will be encountered when they have been expanded by an 
external actor.
         val index = reusablePVCs.indexWhere { p =>
           p.getSpec.getStorageClassName == pvc.getSpec.getStorageClassName &&
-            p.getSpec.getResources.getRequests.get("storage") ==
-              pvc.getSpec.getResources.getRequests.get("storage")
+          p.getSpec.getResources.getRequests.get("storage")
+            .compareTo(pvc.getSpec.getResources.getRequests.get("storage")) >= 0
         }
         if (index >= 0) {
           val volume = pod.getSpec.getVolumes.asScala.find { v =>
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 0f4db9b8e036..e994ccbed9a6 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -1021,6 +1021,31 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
     assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0)
   }
 
+  test("SPARK-55496: replacePVCsIfNeeded should re-use disks with larger 
storage") {
+    val podToModify = podWithAttachedContainerForIdAndVolume(1)
+    val resourcesFromSpec: Seq[HasMetadata] = 
Seq(persistentVolumeClaim("pvc-0", "gp3", "200Gi"))
+    val existingPVCName = "pvc-existing"
+    val existingPVCs = mutable
+      .Buffer[PersistentVolumeClaim](persistentVolumeClaim(existingPVCName, 
"gp3", "400Gi"))
+
+    val replacePVCsIfNeeded =
+      PrivateMethod[Seq[HasMetadata]](Symbol("replacePVCsIfNeeded"))
+    val newResources = podsAllocatorUnderTest invokePrivate 
replacePVCsIfNeeded(
+      podToModify,
+      resourcesFromSpec,
+      existingPVCs
+    )
+
+    val podVolumes = podToModify.getSpec.getVolumes;
+    assert(existingPVCs.isEmpty)
+    assert(newResources.isEmpty)
+    assert(podVolumes.size() == 1)
+
+    val modifiedVolume = podVolumes.asScala
+      .find(v => 
v.getPersistentVolumeClaim.getClaimName.equals(existingPVCName))
+    assert(modifiedVolume.nonEmpty)
+  }
+
   private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
     (invocation: InvocationOnMock) => {
       val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to