This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 04d32493fde7 [SPARK-46789][K8S][TESTS] Add `VolumeSuite` to K8s IT
04d32493fde7 is described below
commit 04d32493fde779021871c88709dbbae32f18e512
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Mon Jan 22 16:19:39 2024 +0800
[SPARK-46789][K8S][TESTS] Add `VolumeSuite` to K8s IT
### What changes were proposed in this pull request?
This PR aims to add `VolumeSuite` to K8s IT.
### Why are the changes needed?
To improve the test coverage on various K8s volume use cases.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44827 from dongjoon-hyun/SPARK-46789.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../k8s/integrationtest/KubernetesSuite.scala | 4 +-
.../deploy/k8s/integrationtest/VolumeSuite.scala | 173 +++++++++++++++++++++
2 files changed, 175 insertions(+), 2 deletions(-)
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index f52af87a745c..54ef1f6cee30 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -45,8 +45,8 @@ import org.apache.spark.internal.config._
class KubernetesSuite extends SparkFunSuite
with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with
SparkConfPropagateSuite
with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with
PodTemplateSuite
- with PVTestsSuite with DepsTestsSuite with DecommissionSuite with
RTestsSuite with Logging
- with Eventually with Matchers {
+ with VolumeSuite with PVTestsSuite with DepsTestsSuite with
DecommissionSuite with RTestsSuite
+ with Logging with Eventually with Matchers {
import KubernetesSuite._
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolumeSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolumeSuite.scala
new file mode 100644
index 000000000000..c57e4b4578d6
--- /dev/null
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolumeSuite.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model._
+import org.scalatest.concurrent.PatienceConfiguration
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import
org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend
+
+private[spark] trait VolumeSuite { k8sSuite: KubernetesSuite =>
+ val IGNORE = Some((Some(PatienceConfiguration.Interval(Span(0, Seconds))),
None))
+
+ private def checkDisk(pod: Pod, path: String, expected: String) = {
+ eventually(PatienceConfiguration.Timeout(Span(10, Seconds)), INTERVAL) {
+ implicit val podName: String = pod.getMetadata.getName
+ implicit val components: KubernetesTestComponents =
kubernetesTestComponents
+ assert(Utils.executeCommand("df", path).contains(expected))
+ }
+ }
+
+ test("A driver-only Spark job with a tmpfs-backed localDir volume",
k8sTestTag) {
+ sparkAppConf
+ .set("spark.kubernetes.driver.master", "local[10]")
+ .set("spark.kubernetes.local.dirs.tmpfs", "true")
+ runSparkApplicationAndVerifyCompletion(
+ containerLocalSparkDistroExamplesJar,
+ SPARK_PI_MAIN_CLASS,
+ Seq("local[10]", "Pi is roughly 3"),
+ Seq(),
+ Array.empty[String],
+ driverPodChecker = (driverPod: Pod) => {
+ doBasicDriverPodCheck(driverPod)
+ val path = driverPod.getSpec.getContainers.get(0).getEnv.asScala
+ .filter(_.getName == "SPARK_LOCAL_DIRS").map(_.getValue).head
+ checkDisk(driverPod, path, "tmpfs")
+ },
+ _ => (),
+ isJVM = true,
+ executorPatience = IGNORE)
+ }
+
+ test("A driver-only Spark job with a tmpfs-backed emptyDir data volume",
k8sTestTag) {
+ sparkAppConf
+ .set("spark.kubernetes.driver.master", "local[10]")
+ .set("spark.kubernetes.driver.volumes.emptyDir.data.mount.path", "/data")
+ .set("spark.kubernetes.driver.volumes.emptyDir.data.options.medium",
"Memory")
+ .set("spark.kubernetes.driver.volumes.emptyDir.data.options.sizeLimit",
"1G")
+ runSparkApplicationAndVerifyCompletion(
+ containerLocalSparkDistroExamplesJar,
+ SPARK_PI_MAIN_CLASS,
+ Seq("local[10]", "Pi is roughly 3"),
+ Seq(),
+ Array.empty[String],
+ driverPodChecker = (driverPod: Pod) => {
+ doBasicDriverPodCheck(driverPod)
+ checkDisk(driverPod, "/data", "tmpfs")
+ },
+ _ => (),
+ isJVM = true,
+ executorPatience = IGNORE)
+ }
+
+ test("A driver-only Spark job with a disk-backed emptyDir volume",
k8sTestTag) {
+ sparkAppConf
+ .set("spark.kubernetes.driver.master", "local[10]")
+ .set("spark.kubernetes.driver.volumes.emptyDir.data.mount.path", "/data")
+ .set("spark.kubernetes.driver.volumes.emptyDir.data.mount.sizeLimit",
"1G")
+ runSparkApplicationAndVerifyCompletion(
+ containerLocalSparkDistroExamplesJar,
+ SPARK_PI_MAIN_CLASS,
+ Seq("local[10]", "Pi is roughly 3"),
+ Seq(),
+ Array.empty[String],
+ driverPodChecker = (driverPod: Pod) => {
+ doBasicDriverPodCheck(driverPod)
+ checkDisk(driverPod, "/data", "/dev/")
+ },
+ _ => (),
+ isJVM = true,
+ executorPatience = IGNORE)
+ }
+
+ test("A driver-only Spark job with an OnDemand PVC volume", k8sTestTag) {
+ val storageClassName = if (testBackend == MinikubeTestBackend) "standard"
else "hostpath"
+ val DRIVER_PREFIX = "spark.kubernetes.driver.volumes.persistentVolumeClaim"
+ sparkAppConf
+ .set("spark.kubernetes.driver.master", "local[10]")
+ .set(s"$DRIVER_PREFIX.data.options.claimName", "OnDemand")
+ .set(s"$DRIVER_PREFIX.data.options.storageClass", storageClassName)
+ .set(s"$DRIVER_PREFIX.data.options.sizeLimit", "1Gi")
+ .set(s"$DRIVER_PREFIX.data.mount.path", "/data")
+ .set(s"$DRIVER_PREFIX.data.mount.readOnly", "false")
+ runSparkApplicationAndVerifyCompletion(
+ containerLocalSparkDistroExamplesJar,
+ SPARK_PI_MAIN_CLASS,
+ Seq("local[10]", "Pi is roughly 3"),
+ Seq(),
+ Array.empty[String],
+ driverPodChecker = (driverPod: Pod) => {
+ doBasicDriverPodCheck(driverPod)
+ checkDisk(driverPod, "/data", "/dev/")
+ },
+ _ => (),
+ isJVM = true,
+ executorPatience = IGNORE)
+ }
+
+ test("A Spark job with tmpfs-backed localDir volumes", k8sTestTag) {
+ sparkAppConf
+ .set("spark.kubernetes.local.dirs.tmpfs", "true")
+ runSparkApplicationAndVerifyCompletion(
+ containerLocalSparkDistroExamplesJar,
+ SPARK_PI_MAIN_CLASS,
+ Seq("Pi is roughly 3"),
+ Seq(),
+ Array.empty[String],
+ driverPodChecker = (driverPod: Pod) => {
+ doBasicDriverPodCheck(driverPod)
+ val path = driverPod.getSpec.getContainers.get(0).getEnv.asScala
+ .filter(_.getName == "SPARK_LOCAL_DIRS").map(_.getValue).head
+ checkDisk(driverPod, path, "tmpfs")
+ },
+ executorPodChecker = (executorPod: Pod) => {
+ doBasicExecutorPodCheck(executorPod)
+ val path = executorPod.getSpec.getContainers.get(0).getEnv.asScala
+ .filter(_.getName == "SPARK_LOCAL_DIRS").map(_.getValue).head
+ checkDisk(executorPod, path, "tmpfs")
+ },
+ isJVM = true)
+ }
+
+ test("A Spark job with two executors with OnDemand PVC volumes", k8sTestTag)
{
+ val storageClassName = if (testBackend == MinikubeTestBackend) "standard"
else "hostpath"
+ val EXECUTOR_PREFIX =
"spark.kubernetes.executor.volumes.persistentVolumeClaim"
+ sparkAppConf
+ .set("spark.executor.instances", "2")
+ .set(s"$EXECUTOR_PREFIX.data.options.claimName", "OnDemand")
+ .set(s"$EXECUTOR_PREFIX.data.options.storageClass", storageClassName)
+ .set(s"$EXECUTOR_PREFIX.data.options.sizeLimit", "1Gi")
+ .set(s"$EXECUTOR_PREFIX.data.mount.path", "/data")
+ .set(s"$EXECUTOR_PREFIX.data.mount.readOnly", "false")
+ runSparkApplicationAndVerifyCompletion(
+ containerLocalSparkDistroExamplesJar,
+ SPARK_PI_MAIN_CLASS,
+ Seq("Pi is roughly 3"),
+ Seq(),
+ Array.empty[String],
+ _ => (),
+ executorPodChecker = (executorPod: Pod) => {
+ doBasicExecutorPodCheck(executorPod)
+ checkDisk(executorPod, "/data", "/dev/")
+ },
+ isJVM = true)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]