[
https://issues.apache.org/jira/browse/SPARK-35416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18058540#comment-18058540
]
Romain Manni-Bucau commented on SPARK-35416:
--------------------------------------------
[~dongjoon] digging a bit I think there is an issue with the way the store is
synchronized in spark, look at
{code:java}
test("PersistentVolumeClaim Reuse should only reuse not assigned PVC") {
val prefix =
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1"
val confWithPVC = conf.clone
.set(KUBERNETES_DRIVER_POD_NAME.key, "driver")
.set(KUBERNETES_DRIVER_OWN_PVC.key, "true")
.set(KUBERNETES_DRIVER_REUSE_PVC.key, "true")
.set(KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC.key, "true")
.set(EXECUTOR_CONTAINER_IMAGE.key, "spark-test:notexisting")
.set(s"$prefix.mount.path", "/spark-local-dir")
.set(s"$prefix.mount.readOnly", "false")
.set(s"$prefix.options.claimName", "OnDemand")
.set(s"$prefix.options.sizeLimit", "200Gi")
.set(s"$prefix.options.storageClass", "gp2")
val pvc = persistentVolumeClaim("pvc-0", "gp2", "200Gi")
pvc.getMetadata
.setCreationTimestamp(Instant.now().minus(podCreationTimeout + 1,
MILLIS).toString)
when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc).asJava)
val executors = mutable.Buffer[Pod]()
podsAllocatorUnderTest = new ExecutorPodsAllocator(
confWithPVC, secMgr, new KubernetesExecutorBuilder,
kubernetesClient, snapshotsStore, waitForExecutorPodsClock) {
{
super.start(TEST_SPARK_APP_ID, schedulerBackend)
super.setTotalExpectedExecutors(Map(defaultProfile -> 2))
}
override protected def replacePVCsIfNeeded(
pod: Pod, resources: Seq[HasMetadata],
reusablePVCs: mutable.Buffer[PersistentVolumeClaim]):
Seq[HasMetadata] = {
val res = super.replacePVCsIfNeeded(pod, resources, reusablePVCs)
executors += pod
res;
}
override protected def getReusablePVCs(appId: String, used: Seq[String]):
mutable.Buffer[PersistentVolumeClaim] =
mutable.Buffer(pvc)
}
val groupedByPod = executors.groupBy(_.getMetadata.getName)
// we created the 2 requested executors
assert(groupedByPod.size == 2)
// we have a single pod creation (per name)
groupedByPod.values.foreach(it => assert(it.size == 1))
// every pod has 2 volumes (spark conf + local one)
groupedByPod.values.foreach(it => assert(it.head.getSpec.getVolumes.size ==
2))
// every pod has its own local volume - first one is the spark conf so we
skip it
assert(groupedByPod.values.flatten.map(_.getSpec.getVolumes.get(1).getName).toSet.size
== 2)
}
{code}
it will obviously fail cause the updatePod() is not called on the store so when
the second executor is requested it will not see the first one took a PVC and
reuse it.
Now the synchro (call to updatePod()) is only done in ExecutorPodsWatcher so
depends how laggy the watcher will be.
Note: same can happen with ExecutorPodsPollingSnapshotSource.
Indeed the event loop still runs every seconds so if the watcher is a bit
behind you are doomed.
So overall I think current operator code is not robust enough and the common
operator pattern to not rely on the watcher but refetch fresh data everytime
you need should apply in
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator#onNewSnapshots
instead of using snapshot.ExecutorPods.
It should also take into account the driver PVC (today it is totally ignored)
in case driver and executors have the same configuration (storage class/size).
Increasing the scheduler timeout doesn't help much because:
* obviously it will be more laggy which is not desired
* doesn't solve the same exact case where the loop is immediately retriggered
Wdyt?
> Support PersistentVolumeClaim Reuse
> -----------------------------------
>
> Key: SPARK-35416
> URL: https://issues.apache.org/jira/browse/SPARK-35416
> Project: Spark
> Issue Type: Improvement
> Components: Kubernetes
> Affects Versions: 3.2.0
> Reporter: Dongjoon Hyun
> Assignee: Dongjoon Hyun
> Priority: Major
> Fix For: 3.2.0
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]