This is an automated email from the ASF dual-hosted git repository. shaneknapp pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 39577a2 [SPARK-24902][K8S] Add PV integration tests 39577a2 is described below commit 39577a27a0b58fd75b41d24b10012447748b7ee9 Author: Stavros Kontopoulos <stavros.kontopou...@lightbend.com> AuthorDate: Wed Mar 27 13:00:56 2019 -0700 [SPARK-24902][K8S] Add PV integration tests ## What changes were proposed in this pull request? - Adds persistent volume integration tests - Adds a custom tag to the test to exclude it if it is run against a cloud backend. - Assumes default fs type for the host, AFAIK that is ext4. ## How was this patch tested? Manually run the tests against minikube as usual: ``` [INFO] --- scalatest-maven-plugin:1.0:test (integration-test) spark-kubernetes-integration-tests_2.12 --- Discovery starting. Discovery completed in 192 milliseconds. Run starting. Expected test count is: 16 KubernetesSuite: - Run SparkPi with no resources - Run SparkPi with a very long application name. - Use SparkLauncher.NO_RESOURCE - Run SparkPi with a master URL without a scheme. - Run SparkPi with an argument. - Run SparkPi with custom labels, annotations, and environment variables. - Run extraJVMOptions check on driver - Run SparkRemoteFileTest using a remote data file - Run SparkPi with env and mount secrets. - Run PySpark on simple pi.py example - Run PySpark with Python2 to test a pyfiles example - Run PySpark with Python3 to test a pyfiles example - Run PySpark with memory customization - Run in client mode. - Start pod creation from template - Test PVs with local storage ``` Closes #23514 from skonto/pvctests. Authored-by: Stavros Kontopoulos <stavros.kontopou...@lightbend.com> Signed-off-by: shane knapp <incompl...@gmail.com> --- .../apache/spark/examples/DFSReadWriteTest.scala | 12 +- .../k8s/integrationtest/KubernetesSuite.scala | 35 +++- .../integrationtest/KubernetesTestComponents.scala | 3 +- .../deploy/k8s/integrationtest/PVTestsSuite.scala | 189 +++++++++++++++++++++ .../k8s/integrationtest/SecretsTestsSuite.scala | 27 +-- .../spark/deploy/k8s/integrationtest/Utils.scala | 22 +++ 6 files changed, 260 insertions(+), 28 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 1a77971..a738598 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -22,6 +22,9 @@ import java.io.File import scala.io.Source._ +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + import org.apache.spark.sql.SparkSession /** @@ -107,6 +110,13 @@ object DFSReadWriteTest { println("Writing local file to DFS") val dfsFilename = s"$dfsDirPath/dfs_read_write_test" + + // delete file if exists + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) + if (fs.exists(new Path(dfsFilename))) { + fs.delete(new Path(dfsFilename), true) + } + val fileRDD = spark.sparkContext.parallelize(fileContents) fileRDD.saveAsTextFile(dfsFilename) @@ -123,7 +133,6 @@ object DFSReadWriteTest { .sum spark.stop() - if (localWordCount == dfsWordCount) { println(s"Success! Local Word Count $localWordCount and " + s"DFS Word Count $dfsWordCount agree.") @@ -131,7 +140,6 @@ object DFSReadWriteTest { println(s"Failure! Local Word Count $localWordCount " + s"and DFS Word Count $dfsWordCount disagree.") } - } } // scalastyle:on println diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 91419e8..bc0bb20 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -40,7 +40,7 @@ import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite - with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite + with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite with Logging with Eventually with Matchers { import KubernetesSuite._ @@ -178,6 +178,29 @@ class KubernetesSuite extends SparkFunSuite isJVM) } + protected def runDFSReadWriteAndVerifyCompletion( + wordCount: Int, + appResource: String = containerLocalSparkDistroExamplesJar, + driverPodChecker: Pod => Unit = doBasicDriverPodCheck, + executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, + appArgs: Array[String] = Array.empty[String], + appLocator: String = appLocator, + isJVM: Boolean = true, + interval: Option[PatienceConfiguration.Interval] = None): Unit = { + runSparkApplicationAndVerifyCompletion( + appResource, + SPARK_DFS_READ_WRITE_TEST, + Seq(s"Success! Local Word Count $wordCount and " + + s"DFS Word Count $wordCount agree."), + appArgs, + driverPodChecker, + executorPodChecker, + appLocator, + isJVM, + None, + interval) + } + protected def runSparkRemoteCheckAndVerifyCompletion( appResource: String = containerLocalSparkDistroExamplesJar, driverPodChecker: Pod => Unit = doBasicDriverPodCheck, @@ -241,7 +264,8 @@ class KubernetesSuite extends SparkFunSuite executorPodChecker: Pod => Unit, appLocator: String, isJVM: Boolean, - pyFiles: Option[String] = None): Unit = { + pyFiles: Option[String] = None, + interval: Option[PatienceConfiguration.Interval] = None): Unit = { val appArguments = SparkAppArguments( mainAppResource = appResource, mainClass = mainClass, @@ -281,10 +305,12 @@ class KubernetesSuite extends SparkFunSuite } } }) - Eventually.eventually(TIMEOUT, INTERVAL) { execPods.values.nonEmpty should be (true) } + + val patienceInterval = interval.getOrElse(INTERVAL) + Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) } execWatcher.close() execPods.values.foreach(executorPodChecker(_)) - Eventually.eventually(TIMEOUT, INTERVAL) { + Eventually.eventually(TIMEOUT, patienceInterval) { expectedLogOnCompletion.foreach { e => assert(kubernetesTestComponents.kubernetesClient .pods() @@ -383,6 +409,7 @@ class KubernetesSuite extends SparkFunSuite private[spark] object KubernetesSuite { val k8sTestTag = Tag("k8s") val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi" + val SPARK_DFS_READ_WRITE_TEST = "org.apache.spark.examples.DFSReadWriteTest" val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest" val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest" val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index e539c8e..4005945 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -21,6 +21,7 @@ import java.util.UUID import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.scalatest.concurrent.Eventually @@ -124,7 +125,7 @@ private[spark] object SparkAppLauncher extends Logging { appConf.toStringArray :+ appArguments.mainAppResource if (appArguments.appArgs.nonEmpty) { - commandLine += appArguments.appArgs.mkString(" ") + commandLine ++= appArguments.appArgs.to[ArrayBuffer] } logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}") ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala new file mode 100644 index 0000000..d7a237f --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.io.{File, PrintWriter} + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.storage.StorageClassBuilder +import org.scalatest.Tag +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Milliseconds, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ + +private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite => + import PVTestsSuite._ + + private def setupLocalStorage(): Unit = { + val scBuilder = new StorageClassBuilder() + .withKind("StorageClass") + .withApiVersion("storage.k8s.io/v1") + .withNewMetadata() + .withName(STORAGE_NAME) + .endMetadata() + .withProvisioner("kubernetes.io/no-provisioner") + .withVolumeBindingMode("WaitForFirstConsumer") + + val pvBuilder = new PersistentVolumeBuilder() + .withKind("PersistentVolume") + .withApiVersion("v1") + .withNewMetadata() + .withName("test-local-pv") + .endMetadata() + .withNewSpec() + .withCapacity(Map("storage" -> new QuantityBuilder().withAmount("1Gi").build()).asJava) + .withAccessModes("ReadWriteOnce") + .withPersistentVolumeReclaimPolicy("Retain") + .withStorageClassName("test-local-storage") + .withLocal(new LocalVolumeSourceBuilder().withPath(VM_PATH).build()) + .withNewNodeAffinity() + .withNewRequired() + .withNodeSelectorTerms(new NodeSelectorTermBuilder() + .withMatchExpressions(new NodeSelectorRequirementBuilder() + .withKey("kubernetes.io/hostname") + .withOperator("In") + .withValues("minikube").build()).build()) + .endRequired() + .endNodeAffinity() + .endSpec() + + val pvcBuilder = new PersistentVolumeClaimBuilder() + .withKind("PersistentVolumeClaim") + .withApiVersion("v1") + .withNewMetadata() + .withName(PVC_NAME) + .endMetadata() + .withNewSpec() + .withAccessModes("ReadWriteOnce") + .withStorageClassName("test-local-storage") + .withResources(new ResourceRequirementsBuilder() + .withRequests(Map("storage" -> new QuantityBuilder() + .withAmount("1Gi").build()).asJava).build()) + .endSpec() + + kubernetesTestComponents + .kubernetesClient + .storage() + .storageClasses() + .create(scBuilder.build()) + + kubernetesTestComponents + .kubernetesClient + .persistentVolumes() + .create(pvBuilder.build()) + + kubernetesTestComponents + .kubernetesClient + .persistentVolumeClaims() + .create(pvcBuilder.build()) + } + + private def deleteLocalStorage(): Unit = { + kubernetesTestComponents + .kubernetesClient + .persistentVolumeClaims() + .withName(PVC_NAME) + .delete() + + kubernetesTestComponents + .kubernetesClient + .persistentVolumes() + .withName(PV_NAME) + .delete() + + kubernetesTestComponents + .kubernetesClient + .storage() + .storageClasses() + .withName(STORAGE_NAME) + .delete() + } + + private def checkPVs(pod: Pod, file: String) = { + Eventually.eventually(TIMEOUT, INTERVAL) { + implicit val podName: String = pod.getMetadata.getName + implicit val components: KubernetesTestComponents = kubernetesTestComponents + val contents = Utils.executeCommand("cat", s"$CONTAINER_MOUNT_PATH/$file") + assert(contents.toString.trim.equals(FILE_CONTENTS)) + } + } + + private def createTempFile(): String = { + val filename = try { + val f = File.createTempFile("tmp", ".txt", new File(HOST_PATH)) + f.deleteOnExit() + new PrintWriter(f) { + try { + write(FILE_CONTENTS) + } finally { + close() + } + } + f.getName + } catch { + case e: Exception => e.printStackTrace(); throw e; + } + filename + } + + test("Test PVs with local storage", k8sTestTag, MinikubeTag) { + sparkAppConf + .set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path", + CONTAINER_MOUNT_PATH) + .set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.options.claimName", + PVC_NAME) + .set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path", + CONTAINER_MOUNT_PATH) + .set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName", + PVC_NAME) + val file = createTempFile() + try { + setupLocalStorage() + runDFSReadWriteAndVerifyCompletion( + FILE_CONTENTS.split(" ").length, + driverPodChecker = (driverPod: Pod) => { + doBasicDriverPodCheck(driverPod) + checkPVs(driverPod, file) + }, + executorPodChecker = (executorPod: Pod) => { + doBasicExecutorPodCheck(executorPod) + checkPVs(executorPod, file) + }, + appArgs = Array(s"$CONTAINER_MOUNT_PATH/$file", s"$CONTAINER_MOUNT_PATH"), + interval = Some(PV_TESTS_INTERVAL) + ) + } finally { + // make sure this always run + deleteLocalStorage() + } + } +} + +private[spark] object PVTestsSuite { + val MinikubeTag = Tag("minikube") + val STORAGE_NAME = "test-local-storage" + val PV_NAME = "test-local-pv" + val PVC_NAME = "test-local-pvc" + val CONTAINER_MOUNT_PATH = "/opt/spark/pv-tests" + val HOST_PATH = sys.env.getOrElse("PVC_TESTS_HOST_PATH", "/tmp") + val VM_PATH = sys.env.getOrElse("PVC_TESTS_VM_PATH", "/tmp") + val FILE_CONTENTS = "test PVs" + val PV_TESTS_INTERVAL = PatienceConfiguration.Interval(Span(10, Milliseconds)) +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala index b18a6ae..cd61ea1 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala @@ -83,33 +83,18 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite => private def checkSecrets(pod: Pod): Unit = { Eventually.eventually(TIMEOUT, INTERVAL) { implicit val podName: String = pod.getMetadata.getName - val env = executeCommand("env") + implicit val components: KubernetesTestComponents = kubernetesTestComponents + val env = Utils.executeCommand("env") assert(env.toString.contains(ENV_SECRET_VALUE_1)) assert(env.toString.contains(ENV_SECRET_VALUE_2)) - val fileUsernameContents = executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1") - val filePasswordContents = executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2") + val fileUsernameContents = Utils + .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1") + val filePasswordContents = Utils + .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2") assert(fileUsernameContents.toString.trim.equals(ENV_SECRET_VALUE_1)) assert(filePasswordContents.toString.trim.equals(ENV_SECRET_VALUE_2)) } } - - private def executeCommand(cmd: String*)(implicit podName: String): String = { - val out = new ByteArrayOutputStream() - val watch = kubernetesTestComponents - .kubernetesClient - .pods() - .withName(podName) - .readingInput(System.in) - .writingOutput(out) - .writingError(System.err) - .withTTY() - .exec(cmd.toArray: _*) - // wait to get some result back - Thread.sleep(1000) - watch.close() - out.flush() - out.toString() - } } private[spark] object SecretsTestsSuite { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala index 663f8b6..d425f70 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -19,6 +19,8 @@ package org.apache.spark.deploy.k8s.integrationtest import java.io.Closeable import java.net.URI +import org.apache.commons.io.output.ByteArrayOutputStream + import org.apache.spark.internal.Logging object Utils extends Logging { @@ -27,4 +29,24 @@ object Utils extends Logging { val resource = createResource try f.apply(resource) finally resource.close() } + + def executeCommand(cmd: String*)( + implicit podName: String, + kubernetesTestComponents: KubernetesTestComponents): String = { + val out = new ByteArrayOutputStream() + val watch = kubernetesTestComponents + .kubernetesClient + .pods() + .withName(podName) + .readingInput(System.in) + .writingOutput(out) + .writingError(System.err) + .withTTY() + .exec(cmd.toArray: _*) + // wait to get some result back + Thread.sleep(1000) + watch.close() + out.flush() + out.toString() + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org