This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b065c945fe2 [SPARK-39006][K8S] Show a directional error message for
executor PVC dynamic allocation failure
b065c945fe2 is described below
commit b065c945fe27dd5869b39bfeaad8e2b23a8835b5
Author: Qian.Sun <[email protected]>
AuthorDate: Sat May 7 17:58:20 2022 -0700
[SPARK-39006][K8S] Show a directional error message for executor PVC
dynamic allocation failure
### What changes were proposed in this pull request?
This PR aims to show a directional error message for executor PVC dynamic
allocation failure.
### Why are the changes needed?
#29846 supports dynamic PVC creation/deletion for K8s executors.
#29557 support execId placeholder in executor PVC conf.
If not set
`spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName`
with `onDemand` or `SPARK_EXECUTOR_ID`, spark will continue to try to create
the executor pod.
After this PR, spark can show a directional error message for this
situation.
```plain
ERROR ExecutorPodsSnapshotsStoreImpl: Going to stop due to
IllegalArgumentException
java.lang.IllegalArgumentException: PVC ClaimName should contain OnDemand
or SPARK_EXECUTOR_ID when multiple executors are required
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add unit test.
Closes #36374 from dcoliversun/SPARK-39006.
Authored-by: Qian.Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../k8s/features/MountVolumesFeatureStep.scala | 16 +++++++++
.../features/MountVolumesFeatureStepSuite.scala | 39 +++++++++++++++++++++-
2 files changed, 54 insertions(+), 1 deletion(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
index 78dd6ec21ed..d47024ca9fe 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
@@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model._
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Constants.{ENV_EXECUTOR_ID,
SPARK_APP_ID_LABEL}
+import org.apache.spark.internal.config.EXECUTOR_INSTANCES
private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {
@@ -71,6 +72,7 @@ private[spark] class MountVolumesFeatureStep(conf:
KubernetesConf)
case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) =>
val claimName = conf match {
case c: KubernetesExecutorConf =>
+ checkPVCClaimName(claimNameTemplate)
claimNameTemplate
.replaceAll(PVC_ON_DEMAND,
s"${conf.resourceNamePrefix}-exec-${c.executorId}$PVC_POSTFIX-$i")
@@ -120,6 +122,20 @@ private[spark] class MountVolumesFeatureStep(conf:
KubernetesConf)
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
additionalResources.toSeq
}
+
+ private def checkPVCClaimName(claimName: String): Unit = {
+ val executorInstances = conf.get(EXECUTOR_INSTANCES)
+ if (executorInstances.isDefined && executorInstances.get > 1) {
+ // PVC ClaimName should contain OnDemand or SPARK_EXECUTOR_ID
+ // when requiring multiple executors.
+ // Else, spark continues to try to create the executor pod.
+ if (!claimName.contains(PVC_ON_DEMAND) &&
!claimName.contains(ENV_EXECUTOR_ID)) {
+ throw new IllegalArgumentException(s"PVC ClaimName: $claimName " +
+ s"should contain $PVC_ON_DEMAND or $ENV_EXECUTOR_ID " +
+ "when requiring multiple executors")
+ }
+ }
+ }
}
private[spark] object MountVolumesFeatureStep {
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
index 468d1dde9fb..e428e54d661 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
@@ -16,10 +16,13 @@
*/
package org.apache.spark.deploy.k8s.features
+import java.util.UUID
+
import scala.collection.JavaConverters._
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
+import org.apache.spark.internal.config.EXECUTOR_INSTANCES
class MountVolumesFeatureStepSuite extends SparkFunSuite {
test("Mounts hostPath volumes") {
@@ -148,6 +151,40 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0"))
}
+ test("SPARK-39006: Check PVC ClaimName") {
+ val claimName = s"pvc-${UUID.randomUUID().toString}"
+ val volumeConf = KubernetesVolumeSpec(
+ "testVolume",
+ "/tmp",
+ "",
+ mountReadOnly = true,
+ KubernetesPVCVolumeConf(claimName)
+ )
+ // Create pvc without specified claimName unsuccessfully when requiring
multiple executors
+ val conf = new SparkConf().set(EXECUTOR_INSTANCES, 2)
+ var executorConf =
+ KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes =
Seq(volumeConf))
+ var executorStep = new MountVolumesFeatureStep(executorConf)
+ assertThrows[IllegalArgumentException] {
+ executorStep.configurePod(SparkPod.initialPod())
+ }
+ assert(intercept[IllegalArgumentException] {
+ executorStep.configurePod(SparkPod.initialPod())
+ }.getMessage.equals(s"PVC ClaimName: $claimName " +
+ "should contain OnDemand or SPARK_EXECUTOR_ID when requiring multiple
executors"))
+
+ // Create and mount pvc with any claimName successfully when requiring one
executor
+ conf.set(EXECUTOR_INSTANCES, 1)
+ executorConf =
+ KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes =
Seq(volumeConf))
+ executorStep = new MountVolumesFeatureStep(executorConf)
+ val executorPod = executorStep.configurePod(SparkPod.initialPod())
+
+ assert(executorPod.pod.getSpec.getVolumes.size() === 1)
+ val executorPVC =
executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
+ assert(executorPVC.getClaimName.equals(claimName))
+ }
+
test("Mounts emptyDir") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]