[ 
https://issues.apache.org/jira/browse/SPARK-35416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18058540#comment-18058540
 ] 

Romain Manni-Bucau edited comment on SPARK-35416 at 2/13/26 10:18 PM:
----------------------------------------------------------------------

[~dongjoon] digging a bit I think there is an issue with the way the store is 
synchronized in spark, look at
{code:java}
test("PersistentVolumeClaim Reuse should only reuse not assigned PVC") {
  val prefix = 
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1"
  val confWithPVC = conf.clone
    .set(KUBERNETES_DRIVER_POD_NAME.key, "driver")
    .set(KUBERNETES_DRIVER_OWN_PVC.key, "true")
    .set(KUBERNETES_DRIVER_REUSE_PVC.key, "true")
    .set(KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC.key, "true")
    .set(EXECUTOR_CONTAINER_IMAGE.key, "spark-test:notexisting")
    .set(s"$prefix.mount.path", "/spark-local-dir")
    .set(s"$prefix.mount.readOnly", "false")
    .set(s"$prefix.options.claimName", "OnDemand")
    .set(s"$prefix.options.sizeLimit", "200Gi")
    .set(s"$prefix.options.storageClass", "gp2")

  val pvc = persistentVolumeClaim("pvc-0", "gp2", "200Gi")
  pvc.getMetadata
    .setCreationTimestamp(Instant.now().minus(podCreationTimeout + 1, 
MILLIS).toString)
  when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc).asJava)

  val executors = mutable.Buffer[Pod]()
  podsAllocatorUnderTest = new ExecutorPodsAllocator(
    confWithPVC, secMgr, new KubernetesExecutorBuilder,
    kubernetesClient, snapshotsStore, waitForExecutorPodsClock) {
    {
      super.start(TEST_SPARK_APP_ID, schedulerBackend)
      super.setTotalExpectedExecutors(Map(defaultProfile -> 2))
    }

    override protected def replacePVCsIfNeeded(
          pod: Pod, resources: Seq[HasMetadata],
          reusablePVCs: mutable.Buffer[PersistentVolumeClaim]): 
Seq[HasMetadata] = {
      val res = super.replacePVCsIfNeeded(pod, resources, reusablePVCs)
      executors += pod
      res;
    }

    override protected def getReusablePVCs(appId: String, used: Seq[String]):
      mutable.Buffer[PersistentVolumeClaim] =
        mutable.Buffer(pvc)
    }

  val groupedByPod = executors.groupBy(_.getMetadata.getName)
  // we created the 2 requested executors
  assert(groupedByPod.size == 2)
  // we have a single pod creation (per name)
  groupedByPod.values.foreach(it => assert(it.size == 1))
  // every pod has 2 volumes (spark conf + local one)
  groupedByPod.values.foreach(it => assert(it.head.getSpec.getVolumes.size == 
2))
  // every pod has its own local volume - first one is the spark conf so we 
skip it
  
assert(groupedByPod.values.flatten.map(_.getSpec.getVolumes.get(1).getName).toSet.size
 == 2)
}
 {code}
it will obviously fail cause the updatePod() is not called on the store so when 
the second executor is requested it will not see the first one took a PVC and 
reuse it.

Now the synchro (call to updatePod()) is only done in  ExecutorPodsWatcher so 
depends how laggy the watcher will be.

Note: same can happen with  ExecutorPodsPollingSnapshotSource.

Indeed the event loop still runs every seconds so if the watcher is a bit 
behind you are doomed.

So overall I think current operator code is not robust enough and the common 
operator pattern to not rely on the watcher but refetch fresh data everytime 
you need should apply in 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator#onNewSnapshots 
instead of using snapshot.ExecutorPods.

It should also take into account the driver PVC (today it is totally ignored) 
in case driver and executors have the same configuration (storage class/size).

Finally it should ignore the PVC with a status "bound".

Increasing the scheduler timeout doesn't help much because:
 * obviously it will be more laggy which is not desired
 * doesn't solve the same exact case where the loop is immediately retriggered

Wdyt?


was (Author: romain.manni-bucau):
[~dongjoon] digging a bit I think there is an issue with the way the store is 
synchronized in spark, look at



{code:java}
test("PersistentVolumeClaim Reuse should only reuse not assigned PVC") {
  val prefix = 
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1"
  val confWithPVC = conf.clone
    .set(KUBERNETES_DRIVER_POD_NAME.key, "driver")
    .set(KUBERNETES_DRIVER_OWN_PVC.key, "true")
    .set(KUBERNETES_DRIVER_REUSE_PVC.key, "true")
    .set(KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC.key, "true")
    .set(EXECUTOR_CONTAINER_IMAGE.key, "spark-test:notexisting")
    .set(s"$prefix.mount.path", "/spark-local-dir")
    .set(s"$prefix.mount.readOnly", "false")
    .set(s"$prefix.options.claimName", "OnDemand")
    .set(s"$prefix.options.sizeLimit", "200Gi")
    .set(s"$prefix.options.storageClass", "gp2")

  val pvc = persistentVolumeClaim("pvc-0", "gp2", "200Gi")
  pvc.getMetadata
    .setCreationTimestamp(Instant.now().minus(podCreationTimeout + 1, 
MILLIS).toString)
  when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc).asJava)

  val executors = mutable.Buffer[Pod]()
  podsAllocatorUnderTest = new ExecutorPodsAllocator(
    confWithPVC, secMgr, new KubernetesExecutorBuilder,
    kubernetesClient, snapshotsStore, waitForExecutorPodsClock) {
    {
      super.start(TEST_SPARK_APP_ID, schedulerBackend)
      super.setTotalExpectedExecutors(Map(defaultProfile -> 2))
    }

    override protected def replacePVCsIfNeeded(
          pod: Pod, resources: Seq[HasMetadata],
          reusablePVCs: mutable.Buffer[PersistentVolumeClaim]): 
Seq[HasMetadata] = {
      val res = super.replacePVCsIfNeeded(pod, resources, reusablePVCs)
      executors += pod
      res;
    }

    override protected def getReusablePVCs(appId: String, used: Seq[String]):
      mutable.Buffer[PersistentVolumeClaim] =
        mutable.Buffer(pvc)
    }

  val groupedByPod = executors.groupBy(_.getMetadata.getName)
  // we created the 2 requested executors
  assert(groupedByPod.size == 2)
  // we have a single pod creation (per name)
  groupedByPod.values.foreach(it => assert(it.size == 1))
  // every pod has 2 volumes (spark conf + local one)
  groupedByPod.values.foreach(it => assert(it.head.getSpec.getVolumes.size == 
2))
  // every pod has its own local volume - first one is the spark conf so we 
skip it
  
assert(groupedByPod.values.flatten.map(_.getSpec.getVolumes.get(1).getName).toSet.size
 == 2)
}
 {code}
it will obviously fail cause the updatePod() is not called on the store so when 
the second executor is requested it will not see the first one took a PVC and 
reuse it.

Now the synchro (call to updatePod()) is only done in  ExecutorPodsWatcher so 
depends how laggy the watcher will be.

Note: same can happen with  ExecutorPodsPollingSnapshotSource.

Indeed the event loop still runs every seconds so if the watcher is a bit 
behind you are doomed.

So overall I think current operator code is not robust enough and the common 
operator pattern to not rely on the watcher but refetch fresh data everytime 
you need should apply in 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator#onNewSnapshots 
instead of using snapshot.ExecutorPods.

It should also take into account the driver PVC (today it is totally ignored) 
in case driver and executors have the same configuration (storage class/size).

Increasing the scheduler timeout doesn't help much because:
 * obviously it will be more laggy which is not desired
 * doesn't solve the same exact case where the loop is immediately retriggered

Wdyt?

> Support PersistentVolumeClaim Reuse
> -----------------------------------
>
>                 Key: SPARK-35416
>                 URL: https://issues.apache.org/jira/browse/SPARK-35416
>             Project: Spark
>          Issue Type: Improvement
>          Components: Kubernetes
>    Affects Versions: 3.2.0
>            Reporter: Dongjoon Hyun
>            Assignee: Dongjoon Hyun
>            Priority: Major
>             Fix For: 3.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to