This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new cdd41a9f2c4f [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s cdd41a9f2c4f is described below commit cdd41a9f2c4f278c5da7e1826c5e0ca0db7ec548 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Wed Feb 7 14:58:20 2024 -0800 [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s ### What changes were proposed in this pull request? This PR aims to detect and fails on invalid volume size. ### Why are the changes needed? This happens when the user forget the unit of volume size. For example, `100` instead of `100Gi`. ### Does this PR introduce _any_ user-facing change? For K8s volumes, the system is trying to use the system default minimum volume size. However it totally depends on the underlying system. And, this misconfiguration misleads the users in many cases because the job is started and running in unhealthy status. - First, the executor pods will be killed by the K8s control plane due to the out of disk situation. - Second, Spark is trying to create new executors (still with small disks) and to retry multiple times. We had better detect the missed-unit situation and make those jobs fail as early as possible. ### How was this patch tested? Pass the CIs with newly added test cases. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45061 from dongjoon-hyun/SPARK-47003. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/deploy/k8s/KubernetesVolumeUtils.scala | 13 +++++++++++ .../deploy/k8s/KubernetesVolumeUtilsSuite.scala | 26 ++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala index 18fda708d9bb..baa519658c2e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.deploy.k8s +import java.lang.Long.parseLong + import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ @@ -76,6 +78,7 @@ private[spark] object KubernetesVolumeUtils { s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY" val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY" verifyOptionKey(options, claimNameKey, KUBERNETES_VOLUMES_PVC_TYPE) + verifySize(options.get(sizeLimitKey)) KubernetesPVCVolumeConf( options(claimNameKey), options.get(storageClassKey), @@ -84,6 +87,7 @@ private[spark] object KubernetesVolumeUtils { case KUBERNETES_VOLUMES_EMPTYDIR_TYPE => val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY" val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY" + verifySize(options.get(sizeLimitKey)) KubernetesEmptyDirVolumeConf(options.get(mediumKey), options.get(sizeLimitKey)) case KUBERNETES_VOLUMES_NFS_TYPE => @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { + size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { + throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } + } + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala index 156740d7c8ae..fdc1aae0d410 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala @@ -182,4 +182,30 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { } assert(e.getMessage.contains("nfs.volumeName.options.server")) } + + test("SPARK-47003: Check emptyDir volume size") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.emptyDir.volumeName.mount.path", "/path") + sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true") + sparkConf.set("test.emptyDir.volumeName.options.medium", "medium") + sparkConf.set("test.emptyDir.volumeName.options.sizeLimit", "5") + + val m = intercept[IllegalArgumentException] { + KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.") + }.getMessage + assert(m.contains("smaller than 1KiB. Missing units?")) + } + + test("SPARK-47003: Check persistentVolumeClaim volume size") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path") + sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "false") + sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName") + sparkConf.set("test.persistentVolumeClaim.volumeName.options.sizeLimit", "1000") + + val m = intercept[IllegalArgumentException] { + KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.") + }.getMessage + assert(m.contains("smaller than 1KiB. Missing units?")) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org