This is an automated email from the ASF dual-hosted git repository.

shaneknapp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 39577a2  [SPARK-24902][K8S] Add PV integration tests
39577a2 is described below

commit 39577a27a0b58fd75b41d24b10012447748b7ee9
Author: Stavros Kontopoulos <stavros.kontopou...@lightbend.com>
AuthorDate: Wed Mar 27 13:00:56 2019 -0700

    [SPARK-24902][K8S] Add PV integration tests
    
    ## What changes were proposed in this pull request?
    
    - Adds persistent volume integration tests
    - Adds a custom tag to the test to exclude it if it is run against a cloud 
backend.
    - Assumes default fs type for the host, AFAIK that is ext4.
    
    ## How was this patch tested?
    Manually run the tests against minikube as usual:
    ```
    [INFO] --- scalatest-maven-plugin:1.0:test (integration-test)  
spark-kubernetes-integration-tests_2.12 ---
    Discovery starting.
    Discovery completed in 192 milliseconds.
    Run starting. Expected test count is: 16
    KubernetesSuite:
    - Run SparkPi with no resources
    - Run SparkPi with a very long application name.
    - Use SparkLauncher.NO_RESOURCE
    - Run SparkPi with a master URL without a scheme.
    - Run SparkPi with an argument.
    - Run SparkPi with custom labels, annotations, and environment variables.
    - Run extraJVMOptions check on driver
    - Run SparkRemoteFileTest using a remote data file
    - Run SparkPi with env and mount secrets.
    - Run PySpark on simple pi.py example
    - Run PySpark with Python2 to test a pyfiles example
    - Run PySpark with Python3 to test a pyfiles example
    - Run PySpark with memory customization
    - Run in client mode.
    - Start pod creation from template
    - Test PVs with local storage
    ```
    
    Closes #23514 from skonto/pvctests.
    
    Authored-by: Stavros Kontopoulos <stavros.kontopou...@lightbend.com>
    Signed-off-by: shane knapp <incompl...@gmail.com>
---
 .../apache/spark/examples/DFSReadWriteTest.scala   |  12 +-
 .../k8s/integrationtest/KubernetesSuite.scala      |  35 +++-
 .../integrationtest/KubernetesTestComponents.scala |   3 +-
 .../deploy/k8s/integrationtest/PVTestsSuite.scala  | 189 +++++++++++++++++++++
 .../k8s/integrationtest/SecretsTestsSuite.scala    |  27 +--
 .../spark/deploy/k8s/integrationtest/Utils.scala   |  22 +++
 6 files changed, 260 insertions(+), 28 deletions(-)

diff --git 
a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
index 1a77971..a738598 100644
--- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
@@ -22,6 +22,9 @@ import java.io.File
 
 import scala.io.Source._
 
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.sql.SparkSession
 
 /**
@@ -107,6 +110,13 @@ object DFSReadWriteTest {
 
     println("Writing local file to DFS")
     val dfsFilename = s"$dfsDirPath/dfs_read_write_test"
+
+    // delete file if exists
+    val fs = FileSystem.get(spark.sessionState.newHadoopConf())
+    if (fs.exists(new Path(dfsFilename))) {
+        fs.delete(new Path(dfsFilename), true)
+    }
+
     val fileRDD = spark.sparkContext.parallelize(fileContents)
     fileRDD.saveAsTextFile(dfsFilename)
 
@@ -123,7 +133,6 @@ object DFSReadWriteTest {
       .sum
 
     spark.stop()
-
     if (localWordCount == dfsWordCount) {
       println(s"Success! Local Word Count $localWordCount and " +
         s"DFS Word Count $dfsWordCount agree.")
@@ -131,7 +140,6 @@ object DFSReadWriteTest {
       println(s"Failure! Local Word Count $localWordCount " +
         s"and DFS Word Count $dfsWordCount disagree.")
     }
-
   }
 }
 // scalastyle:on println
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 91419e8..bc0bb20 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -40,7 +40,7 @@ import org.apache.spark.internal.config._
 
 class KubernetesSuite extends SparkFunSuite
   with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with 
SecretsTestsSuite
-  with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite
+  with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with 
PVTestsSuite
   with Logging with Eventually with Matchers {
 
   import KubernetesSuite._
@@ -178,6 +178,29 @@ class KubernetesSuite extends SparkFunSuite
       isJVM)
   }
 
+  protected def runDFSReadWriteAndVerifyCompletion(
+      wordCount: Int,
+      appResource: String = containerLocalSparkDistroExamplesJar,
+      driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
+      executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
+      appArgs: Array[String] = Array.empty[String],
+      appLocator: String = appLocator,
+      isJVM: Boolean = true,
+      interval: Option[PatienceConfiguration.Interval] = None): Unit = {
+    runSparkApplicationAndVerifyCompletion(
+      appResource,
+      SPARK_DFS_READ_WRITE_TEST,
+      Seq(s"Success! Local Word Count $wordCount and " +
+    s"DFS Word Count $wordCount agree."),
+      appArgs,
+      driverPodChecker,
+      executorPodChecker,
+      appLocator,
+      isJVM,
+      None,
+      interval)
+  }
+
   protected def runSparkRemoteCheckAndVerifyCompletion(
       appResource: String = containerLocalSparkDistroExamplesJar,
       driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
@@ -241,7 +264,8 @@ class KubernetesSuite extends SparkFunSuite
       executorPodChecker: Pod => Unit,
       appLocator: String,
       isJVM: Boolean,
-      pyFiles: Option[String] = None): Unit = {
+      pyFiles: Option[String] = None,
+      interval: Option[PatienceConfiguration.Interval] = None): Unit = {
     val appArguments = SparkAppArguments(
       mainAppResource = appResource,
       mainClass = mainClass,
@@ -281,10 +305,12 @@ class KubernetesSuite extends SparkFunSuite
           }
         }
       })
-    Eventually.eventually(TIMEOUT, INTERVAL) { execPods.values.nonEmpty should 
be (true) }
+
+    val patienceInterval = interval.getOrElse(INTERVAL)
+    Eventually.eventually(TIMEOUT, patienceInterval) { 
execPods.values.nonEmpty should be (true) }
     execWatcher.close()
     execPods.values.foreach(executorPodChecker(_))
-    Eventually.eventually(TIMEOUT, INTERVAL) {
+    Eventually.eventually(TIMEOUT, patienceInterval) {
       expectedLogOnCompletion.foreach { e =>
         assert(kubernetesTestComponents.kubernetesClient
           .pods()
@@ -383,6 +409,7 @@ class KubernetesSuite extends SparkFunSuite
 private[spark] object KubernetesSuite {
   val k8sTestTag = Tag("k8s")
   val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
+  val SPARK_DFS_READ_WRITE_TEST = "org.apache.spark.examples.DFSReadWriteTest"
   val SPARK_REMOTE_MAIN_CLASS: String = 
"org.apache.spark.examples.SparkRemoteFileTest"
   val SPARK_DRIVER_MAIN_CLASS: String = 
"org.apache.spark.examples.DriverSubmissionTest"
   val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index e539c8e..4005945 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -21,6 +21,7 @@ import java.util.UUID
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
 import org.scalatest.concurrent.Eventually
@@ -124,7 +125,7 @@ private[spark] object SparkAppLauncher extends Logging {
         appConf.toStringArray :+ appArguments.mainAppResource
 
     if (appArguments.appArgs.nonEmpty) {
-      commandLine += appArguments.appArgs.mkString(" ")
+      commandLine ++= appArguments.appArgs.to[ArrayBuffer]
     }
     logInfo(s"Launching a spark app with command line: 
${commandLine.mkString(" ")}")
     ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
new file mode 100644
index 0000000..d7a237f
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.io.{File, PrintWriter}
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.storage.StorageClassBuilder
+import org.scalatest.Tag
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Milliseconds, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+
+private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
+  import PVTestsSuite._
+
+  private def setupLocalStorage(): Unit = {
+    val scBuilder = new StorageClassBuilder()
+      .withKind("StorageClass")
+      .withApiVersion("storage.k8s.io/v1")
+      .withNewMetadata()
+        .withName(STORAGE_NAME)
+      .endMetadata()
+      .withProvisioner("kubernetes.io/no-provisioner")
+      .withVolumeBindingMode("WaitForFirstConsumer")
+
+    val pvBuilder = new PersistentVolumeBuilder()
+      .withKind("PersistentVolume")
+      .withApiVersion("v1")
+      .withNewMetadata()
+        .withName("test-local-pv")
+      .endMetadata()
+      .withNewSpec()
+        .withCapacity(Map("storage" -> new 
QuantityBuilder().withAmount("1Gi").build()).asJava)
+        .withAccessModes("ReadWriteOnce")
+        .withPersistentVolumeReclaimPolicy("Retain")
+        .withStorageClassName("test-local-storage")
+        .withLocal(new LocalVolumeSourceBuilder().withPath(VM_PATH).build())
+          .withNewNodeAffinity()
+            .withNewRequired()
+              .withNodeSelectorTerms(new NodeSelectorTermBuilder()
+                .withMatchExpressions(new NodeSelectorRequirementBuilder()
+                  .withKey("kubernetes.io/hostname")
+                  .withOperator("In")
+                  .withValues("minikube").build()).build())
+            .endRequired()
+          .endNodeAffinity()
+      .endSpec()
+
+    val pvcBuilder = new PersistentVolumeClaimBuilder()
+      .withKind("PersistentVolumeClaim")
+      .withApiVersion("v1")
+      .withNewMetadata()
+        .withName(PVC_NAME)
+      .endMetadata()
+      .withNewSpec()
+        .withAccessModes("ReadWriteOnce")
+        .withStorageClassName("test-local-storage")
+        .withResources(new ResourceRequirementsBuilder()
+        .withRequests(Map("storage" -> new QuantityBuilder()
+          .withAmount("1Gi").build()).asJava).build())
+      .endSpec()
+
+    kubernetesTestComponents
+      .kubernetesClient
+      .storage()
+      .storageClasses()
+      .create(scBuilder.build())
+
+    kubernetesTestComponents
+      .kubernetesClient
+      .persistentVolumes()
+      .create(pvBuilder.build())
+
+    kubernetesTestComponents
+      .kubernetesClient
+      .persistentVolumeClaims()
+      .create(pvcBuilder.build())
+  }
+
+  private def deleteLocalStorage(): Unit = {
+    kubernetesTestComponents
+      .kubernetesClient
+      .persistentVolumeClaims()
+      .withName(PVC_NAME)
+      .delete()
+
+    kubernetesTestComponents
+      .kubernetesClient
+      .persistentVolumes()
+      .withName(PV_NAME)
+      .delete()
+
+    kubernetesTestComponents
+      .kubernetesClient
+      .storage()
+      .storageClasses()
+      .withName(STORAGE_NAME)
+      .delete()
+  }
+
+  private def checkPVs(pod: Pod, file: String) = {
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+      implicit val podName: String = pod.getMetadata.getName
+      implicit val components: KubernetesTestComponents = 
kubernetesTestComponents
+      val contents = Utils.executeCommand("cat", 
s"$CONTAINER_MOUNT_PATH/$file")
+      assert(contents.toString.trim.equals(FILE_CONTENTS))
+    }
+  }
+
+  private def createTempFile(): String = {
+    val filename = try {
+      val f = File.createTempFile("tmp", ".txt", new File(HOST_PATH))
+      f.deleteOnExit()
+      new PrintWriter(f) {
+        try {
+          write(FILE_CONTENTS)
+        } finally {
+          close()
+        }
+      }
+      f.getName
+    } catch {
+      case e: Exception => e.printStackTrace(); throw e;
+    }
+    filename
+  }
+
+  test("Test PVs with local storage", k8sTestTag, MinikubeTag) {
+    sparkAppConf
+      
.set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path",
+        CONTAINER_MOUNT_PATH)
+      
.set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.options.claimName",
+        PVC_NAME)
+      
.set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path",
+        CONTAINER_MOUNT_PATH)
+      
.set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName",
+        PVC_NAME)
+    val file = createTempFile()
+    try {
+      setupLocalStorage()
+      runDFSReadWriteAndVerifyCompletion(
+        FILE_CONTENTS.split(" ").length,
+        driverPodChecker = (driverPod: Pod) => {
+          doBasicDriverPodCheck(driverPod)
+          checkPVs(driverPod, file)
+        },
+        executorPodChecker = (executorPod: Pod) => {
+          doBasicExecutorPodCheck(executorPod)
+          checkPVs(executorPod, file)
+        },
+        appArgs = Array(s"$CONTAINER_MOUNT_PATH/$file", 
s"$CONTAINER_MOUNT_PATH"),
+        interval = Some(PV_TESTS_INTERVAL)
+      )
+    } finally {
+      // make sure this always run
+      deleteLocalStorage()
+    }
+  }
+}
+
+private[spark] object PVTestsSuite {
+  val MinikubeTag = Tag("minikube")
+  val STORAGE_NAME = "test-local-storage"
+  val PV_NAME = "test-local-pv"
+  val PVC_NAME = "test-local-pvc"
+  val CONTAINER_MOUNT_PATH = "/opt/spark/pv-tests"
+  val HOST_PATH = sys.env.getOrElse("PVC_TESTS_HOST_PATH", "/tmp")
+  val VM_PATH = sys.env.getOrElse("PVC_TESTS_VM_PATH", "/tmp")
+  val FILE_CONTENTS = "test PVs"
+  val PV_TESTS_INTERVAL = PatienceConfiguration.Interval(Span(10, 
Milliseconds))
+}
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
index b18a6ae..cd61ea1 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
@@ -83,33 +83,18 @@ private[spark] trait SecretsTestsSuite { k8sSuite: 
KubernetesSuite =>
   private def checkSecrets(pod: Pod): Unit = {
     Eventually.eventually(TIMEOUT, INTERVAL) {
       implicit val podName: String = pod.getMetadata.getName
-      val env = executeCommand("env")
+      implicit val components: KubernetesTestComponents = 
kubernetesTestComponents
+      val env = Utils.executeCommand("env")
       assert(env.toString.contains(ENV_SECRET_VALUE_1))
       assert(env.toString.contains(ENV_SECRET_VALUE_2))
-      val fileUsernameContents = executeCommand("cat", 
s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
-      val filePasswordContents = executeCommand("cat", 
s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
+      val fileUsernameContents = Utils
+        .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
+      val filePasswordContents = Utils
+        .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
       assert(fileUsernameContents.toString.trim.equals(ENV_SECRET_VALUE_1))
       assert(filePasswordContents.toString.trim.equals(ENV_SECRET_VALUE_2))
     }
   }
-
-  private def executeCommand(cmd: String*)(implicit podName: String): String = 
{
-    val out = new ByteArrayOutputStream()
-    val watch = kubernetesTestComponents
-      .kubernetesClient
-      .pods()
-      .withName(podName)
-      .readingInput(System.in)
-      .writingOutput(out)
-      .writingError(System.err)
-      .withTTY()
-      .exec(cmd.toArray: _*)
-    // wait to get some result back
-    Thread.sleep(1000)
-    watch.close()
-    out.flush()
-    out.toString()
-  }
 }
 
 private[spark] object SecretsTestsSuite {
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
index 663f8b6..d425f70 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
@@ -19,6 +19,8 @@ package org.apache.spark.deploy.k8s.integrationtest
 import java.io.Closeable
 import java.net.URI
 
+import org.apache.commons.io.output.ByteArrayOutputStream
+
 import org.apache.spark.internal.Logging
 
 object Utils extends Logging {
@@ -27,4 +29,24 @@ object Utils extends Logging {
     val resource = createResource
     try f.apply(resource) finally resource.close()
   }
+
+  def executeCommand(cmd: String*)(
+      implicit podName: String,
+      kubernetesTestComponents: KubernetesTestComponents): String = {
+    val out = new ByteArrayOutputStream()
+    val watch = kubernetesTestComponents
+      .kubernetesClient
+      .pods()
+      .withName(podName)
+      .readingInput(System.in)
+      .writingOutput(out)
+      .writingError(System.err)
+      .withTTY()
+      .exec(cmd.toArray: _*)
+    // wait to get some result back
+    Thread.sleep(1000)
+    watch.close()
+    out.flush()
+    out.toString()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to