This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cdd41a9f2c4f [SPARK-47003][K8S] Detect and fail on invalid volume
sizes (< 1KiB) in K8s
cdd41a9f2c4f is described below
commit cdd41a9f2c4f278c5da7e1826c5e0ca0db7ec548
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Feb 7 14:58:20 2024 -0800
[SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s
### What changes were proposed in this pull request?
This PR aims to detect and fails on invalid volume size.
### Why are the changes needed?
This happens when the user forget the unit of volume size. For example,
`100` instead of `100Gi`.
### Does this PR introduce _any_ user-facing change?
For K8s volumes, the system is trying to use the system default minimum
volume size. However it totally depends on the underlying system. And, this
misconfiguration misleads the users in many cases because the job is started
and running in unhealthy status.
- First, the executor pods will be killed by the K8s control plane due to
the out of disk situation.
- Second, Spark is trying to create new executors (still with small disks)
and to retry multiple times.
We had better detect the missed-unit situation and make those jobs fail as
early as possible.
### How was this patch tested?
Pass the CIs with newly added test cases.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45061 from dongjoon-hyun/SPARK-47003.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/deploy/k8s/KubernetesVolumeUtils.scala | 13 +++++++++++
.../deploy/k8s/KubernetesVolumeUtilsSuite.scala | 26 ++++++++++++++++++++++
2 files changed, 39 insertions(+)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
index 18fda708d9bb..baa519658c2e 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.k8s
+import java.lang.Long.parseLong
+
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
@@ -76,6 +78,7 @@ private[spark] object KubernetesVolumeUtils {
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY"
val sizeLimitKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
verifyOptionKey(options, claimNameKey, KUBERNETES_VOLUMES_PVC_TYPE)
+ verifySize(options.get(sizeLimitKey))
KubernetesPVCVolumeConf(
options(claimNameKey),
options.get(storageClassKey),
@@ -84,6 +87,7 @@ private[spark] object KubernetesVolumeUtils {
case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
val mediumKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
val sizeLimitKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
+ verifySize(options.get(sizeLimitKey))
KubernetesEmptyDirVolumeConf(options.get(mediumKey),
options.get(sizeLimitKey))
case KUBERNETES_VOLUMES_NFS_TYPE =>
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
throw new NoSuchElementException(key + s" is required for $msg")
}
}
+
+ private def verifySize(size: Option[String]): Unit = {
+ size.foreach { v =>
+ if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+ throw new IllegalArgumentException(
+ s"Volume size `$v` is smaller than 1KiB. Missing units?")
+ }
+ }
+ }
}
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
index 156740d7c8ae..fdc1aae0d410 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
@@ -182,4 +182,30 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
}
assert(e.getMessage.contains("nfs.volumeName.options.server"))
}
+
+ test("SPARK-47003: Check emptyDir volume size") {
+ val sparkConf = new SparkConf(false)
+ sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
+ sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
+ sparkConf.set("test.emptyDir.volumeName.options.medium", "medium")
+ sparkConf.set("test.emptyDir.volumeName.options.sizeLimit", "5")
+
+ val m = intercept[IllegalArgumentException] {
+ KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.")
+ }.getMessage
+ assert(m.contains("smaller than 1KiB. Missing units?"))
+ }
+
+ test("SPARK-47003: Check persistentVolumeClaim volume size") {
+ val sparkConf = new SparkConf(false)
+ sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
+ sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly",
"false")
+ sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName",
"claimName")
+ sparkConf.set("test.persistentVolumeClaim.volumeName.options.sizeLimit",
"1000")
+
+ val m = intercept[IllegalArgumentException] {
+ KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.")
+ }.getMessage
+ assert(m.contains("smaller than 1KiB. Missing units?"))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]