Repository: spark
Updated Branches:
  refs/heads/master 7b6d36bc9 -> 20ce1a8f8


[SPARK-24551][K8S] Add integration tests for secrets

## What changes were proposed in this pull request?

- Adds integration tests for env and mount secrets.

## How was this patch tested?

Manually by checking that secrets were added to the containers and by tuning 
the tests.

![image](https://user-images.githubusercontent.com/7945591/42968472-fee3740a-8bab-11e8-9eac-573f67d861fc.png)

Author: Stavros Kontopoulos <[email protected]>

Closes #21652 from skonto/add-secret-its.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20ce1a8f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20ce1a8f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20ce1a8f

Branch: refs/heads/master
Commit: 20ce1a8f8b2d8d8b41afdd4a8b498b6502aa5e24
Parents: 7b6d36b
Author: Stavros Kontopoulos <[email protected]>
Authored: Fri Jul 20 07:55:58 2018 -0500
Committer: Sean Owen <[email protected]>
Committed: Fri Jul 20 07:55:58 2018 -0500

----------------------------------------------------------------------
 bin/docker-image-tool.sh                        |   2 +-
 .../k8s/integrationtest/BasicTestsSuite.scala   | 106 +++++++++++
 .../k8s/integrationtest/KubernetesSuite.scala   | 177 +++----------------
 .../k8s/integrationtest/PythonTestsSuite.scala  |  83 +++++++++
 .../k8s/integrationtest/SecretsTestsSuite.scala | 122 +++++++++++++
 .../deploy/k8s/integrationtest/TestConfig.scala |  38 ++++
 .../k8s/integrationtest/TestConstants.scala     |  22 +++
 .../deploy/k8s/integrationtest/config.scala     |  38 ----
 .../deploy/k8s/integrationtest/constants.scala  |  22 ---
 9 files changed, 393 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/20ce1a8f/bin/docker-image-tool.sh
----------------------------------------------------------------------
diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh
index f36fb43..cd22e75 100755
--- a/bin/docker-image-tool.sh
+++ b/bin/docker-image-tool.sh
@@ -135,7 +135,7 @@ BASEDOCKERFILE=
 PYDOCKERFILE=
 NOCACHEARG=
 BUILD_PARAMS=
-while getopts f:mr:t:n:b: option
+while getopts f:p:mr:t:n:b: option
 do
  case "${option}"
  in

http://git-wip-us.apache.org/repos/asf/spark/blob/20ce1a8f/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
new file mode 100644
index 0000000..4e749c4
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.launcher.SparkLauncher
+
+private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite =>
+
+  import BasicTestsSuite._
+  import KubernetesSuite.k8sTestTag
+
+  test("Run SparkPi with no resources", k8sTestTag) {
+    runSparkPiAndVerifyCompletion()
+  }
+
+  test("Run SparkPi with a very long application name.", k8sTestTag) {
+    sparkAppConf.set("spark.app.name", "long" * 40)
+    runSparkPiAndVerifyCompletion()
+  }
+
+  test("Use SparkLauncher.NO_RESOURCE", k8sTestTag) {
+    sparkAppConf.setJars(Seq(containerLocalSparkDistroExamplesJar))
+    runSparkPiAndVerifyCompletion(
+      appResource = SparkLauncher.NO_RESOURCE)
+  }
+
+  test("Run SparkPi with a master URL without a scheme.", k8sTestTag) {
+    val url = kubernetesTestComponents.kubernetesClient.getMasterUrl
+    val k8sMasterUrl = if (url.getPort < 0) {
+      s"k8s://${url.getHost}"
+    } else {
+      s"k8s://${url.getHost}:${url.getPort}"
+    }
+    sparkAppConf.set("spark.master", k8sMasterUrl)
+    runSparkPiAndVerifyCompletion()
+  }
+
+  test("Run SparkPi with an argument.", k8sTestTag) {
+    runSparkPiAndVerifyCompletion(appArgs = Array("5"))
+  }
+
+  test("Run SparkPi with custom labels, annotations, and environment 
variables.", k8sTestTag) {
+    sparkAppConf
+      .set("spark.kubernetes.driver.label.label1", "label1-value")
+      .set("spark.kubernetes.driver.label.label2", "label2-value")
+      .set("spark.kubernetes.driver.annotation.annotation1", 
"annotation1-value")
+      .set("spark.kubernetes.driver.annotation.annotation2", 
"annotation2-value")
+      .set("spark.kubernetes.driverEnv.ENV1", "VALUE1")
+      .set("spark.kubernetes.driverEnv.ENV2", "VALUE2")
+      .set("spark.kubernetes.executor.label.label1", "label1-value")
+      .set("spark.kubernetes.executor.label.label2", "label2-value")
+      .set("spark.kubernetes.executor.annotation.annotation1", 
"annotation1-value")
+      .set("spark.kubernetes.executor.annotation.annotation2", 
"annotation2-value")
+      .set("spark.executorEnv.ENV1", "VALUE1")
+      .set("spark.executorEnv.ENV2", "VALUE2")
+
+    runSparkPiAndVerifyCompletion(
+      driverPodChecker = (driverPod: Pod) => {
+        doBasicDriverPodCheck(driverPod)
+        checkCustomSettings(driverPod)
+      },
+      executorPodChecker = (executorPod: Pod) => {
+        doBasicExecutorPodCheck(executorPod)
+        checkCustomSettings(executorPod)
+      })
+  }
+
+  test("Run extraJVMOptions check on driver", k8sTestTag) {
+    sparkAppConf
+      .set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar")
+    runSparkJVMCheckAndVerifyCompletion(
+      expectedJVMValue = Seq("(spark.test.foo,spark.test.bar)"))
+  }
+
+  test("Run SparkRemoteFileTest using a remote data file", k8sTestTag) {
+    sparkAppConf
+      .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
+    runSparkRemoteCheckAndVerifyCompletion(appArgs = 
Array(REMOTE_PAGE_RANK_FILE_NAME))
+  }
+}
+
+private[spark] object BasicTestsSuite {
+  val SPARK_PAGE_RANK_MAIN_CLASS: String = 
"org.apache.spark.examples.SparkPageRank"
+  val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files"
+  val CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE =
+     s"$CONTAINER_LOCAL_FILE_DOWNLOAD_PATH/pagerank_data.txt"
+  val REMOTE_PAGE_RANK_DATA_FILE =
+    
"https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt";
+  val REMOTE_PAGE_RANK_FILE_NAME = "pagerank_data.txt"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/20ce1a8f/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index daabfaa..95694aa 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -29,25 +29,25 @@ import org.scalatest.time.{Minutes, Seconds, Span}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.integrationtest.TestConfig._
 import 
org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, 
IntegrationTestBackendFactory}
-import org.apache.spark.deploy.k8s.integrationtest.config._
-import org.apache.spark.launcher.SparkLauncher
 
 private[spark] class KubernetesSuite extends SparkFunSuite
-  with BeforeAndAfterAll with BeforeAndAfter {
+  with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with 
SecretsTestsSuite
+  with PythonTestsSuite {
 
   import KubernetesSuite._
 
   private var testBackend: IntegrationTestBackend = _
   private var sparkHomeDir: Path = _
-  private var kubernetesTestComponents: KubernetesTestComponents = _
-  private var sparkAppConf: SparkAppConf = _
   private var image: String = _
   private var pyImage: String = _
-  private var containerLocalSparkDistroExamplesJar: String = _
-  private var appLocator: String = _
   private var driverPodName: String = _
-  private val k8sTestTag = Tag("k8s")
+
+  protected var kubernetesTestComponents: KubernetesTestComponents = _
+  protected var sparkAppConf: SparkAppConf = _
+  protected var containerLocalSparkDistroExamplesJar: String = _
+  protected var appLocator: String = _
 
   override def beforeAll(): Unit = {
     // The scalatest-maven-plugin gives system properties that are referenced 
but not set null
@@ -103,127 +103,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
     deleteDriverPod()
   }
 
-  test("Run SparkPi with no resources", k8sTestTag) {
-    runSparkPiAndVerifyCompletion()
-  }
-
-  test("Run SparkPi with a very long application name.", k8sTestTag) {
-    sparkAppConf.set("spark.app.name", "long" * 40)
-    runSparkPiAndVerifyCompletion()
-  }
-
-  test("Use SparkLauncher.NO_RESOURCE", k8sTestTag) {
-    sparkAppConf.setJars(Seq(containerLocalSparkDistroExamplesJar))
-    runSparkPiAndVerifyCompletion(
-      appResource = SparkLauncher.NO_RESOURCE)
-  }
-
-  test("Run SparkPi with a master URL without a scheme.", k8sTestTag) {
-    val url = kubernetesTestComponents.kubernetesClient.getMasterUrl
-    val k8sMasterUrl = if (url.getPort < 0) {
-      s"k8s://${url.getHost}"
-    } else {
-      s"k8s://${url.getHost}:${url.getPort}"
-    }
-    sparkAppConf.set("spark.master", k8sMasterUrl)
-    runSparkPiAndVerifyCompletion()
-  }
-
-  test("Run SparkPi with an argument.", k8sTestTag) {
-    runSparkPiAndVerifyCompletion(appArgs = Array("5"))
-  }
-
-  test("Run SparkPi with custom labels, annotations, and environment 
variables.", k8sTestTag) {
-    sparkAppConf
-      .set("spark.kubernetes.driver.label.label1", "label1-value")
-      .set("spark.kubernetes.driver.label.label2", "label2-value")
-      .set("spark.kubernetes.driver.annotation.annotation1", 
"annotation1-value")
-      .set("spark.kubernetes.driver.annotation.annotation2", 
"annotation2-value")
-      .set("spark.kubernetes.driverEnv.ENV1", "VALUE1")
-      .set("spark.kubernetes.driverEnv.ENV2", "VALUE2")
-      .set("spark.kubernetes.executor.label.label1", "label1-value")
-      .set("spark.kubernetes.executor.label.label2", "label2-value")
-      .set("spark.kubernetes.executor.annotation.annotation1", 
"annotation1-value")
-      .set("spark.kubernetes.executor.annotation.annotation2", 
"annotation2-value")
-      .set("spark.executorEnv.ENV1", "VALUE1")
-      .set("spark.executorEnv.ENV2", "VALUE2")
-
-    runSparkPiAndVerifyCompletion(
-      driverPodChecker = (driverPod: Pod) => {
-        doBasicDriverPodCheck(driverPod)
-        checkCustomSettings(driverPod)
-      },
-      executorPodChecker = (executorPod: Pod) => {
-        doBasicExecutorPodCheck(executorPod)
-        checkCustomSettings(executorPod)
-      })
-  }
-
-  test("Run extraJVMOptions check on driver", k8sTestTag) {
-    sparkAppConf
-      .set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar")
-    runSparkJVMCheckAndVerifyCompletion(
-      expectedJVMValue = Seq("(spark.test.foo,spark.test.bar)"))
-  }
-
-  test("Run SparkRemoteFileTest using a remote data file", k8sTestTag) {
-    sparkAppConf
-      .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
-    runSparkRemoteCheckAndVerifyCompletion(
-      appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME))
-  }
-
-  test("Run PySpark on simple pi.py example", k8sTestTag) {
-    sparkAppConf
-      .set("spark.kubernetes.container.image", 
s"${getTestImageRepo}/spark-py:${getTestImageTag}")
-    runSparkApplicationAndVerifyCompletion(
-      appResource = PYSPARK_PI,
-      mainClass = "",
-      expectedLogOnCompletion = Seq("Pi is roughly 3"),
-      appArgs = Array("5"),
-      driverPodChecker = doBasicDriverPyPodCheck,
-      executorPodChecker = doBasicExecutorPyPodCheck,
-      appLocator = appLocator,
-      isJVM = false)
-  }
-
-  test("Run PySpark with Python2 to test a pyfiles example", k8sTestTag) {
-    sparkAppConf
-      .set("spark.kubernetes.container.image", 
s"${getTestImageRepo}/spark-py:${getTestImageTag}")
-      .set("spark.kubernetes.pyspark.pythonversion", "2")
-    runSparkApplicationAndVerifyCompletion(
-      appResource = PYSPARK_FILES,
-      mainClass = "",
-      expectedLogOnCompletion = Seq(
-        "Python runtime version check is: True",
-        "Python environment version check is: True"),
-      appArgs = Array("python"),
-      driverPodChecker = doBasicDriverPyPodCheck,
-      executorPodChecker = doBasicExecutorPyPodCheck,
-      appLocator = appLocator,
-      isJVM = false,
-      pyFiles = Some(PYSPARK_CONTAINER_TESTS))
-  }
-
-  test("Run PySpark with Python3 to test a pyfiles example", k8sTestTag) {
-    sparkAppConf
-      .set("spark.kubernetes.container.image", 
s"${getTestImageRepo}/spark-py:${getTestImageTag}")
-      .set("spark.kubernetes.pyspark.pythonversion", "3")
-    runSparkApplicationAndVerifyCompletion(
-      appResource = PYSPARK_FILES,
-      mainClass = "",
-      expectedLogOnCompletion = Seq(
-        "Python runtime version check is: True",
-        "Python environment version check is: True"),
-      appArgs = Array("python3"),
-      driverPodChecker = doBasicDriverPyPodCheck,
-      executorPodChecker = doBasicExecutorPyPodCheck,
-      appLocator = appLocator,
-      isJVM = false,
-      pyFiles = Some(PYSPARK_CONTAINER_TESTS))
-  }
-
-  private def runSparkPiAndVerifyCompletion(
+  protected def runSparkPiAndVerifyCompletion(
       appResource: String = containerLocalSparkDistroExamplesJar,
       driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
       executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
@@ -241,7 +121,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
       isJVM)
   }
 
-  private def runSparkRemoteCheckAndVerifyCompletion(
+  protected def runSparkRemoteCheckAndVerifyCompletion(
       appResource: String = containerLocalSparkDistroExamplesJar,
       driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
       executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
@@ -258,7 +138,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
       true)
   }
 
-  private def runSparkJVMCheckAndVerifyCompletion(
+  protected def runSparkJVMCheckAndVerifyCompletion(
       appResource: String = containerLocalSparkDistroExamplesJar,
       mainClass: String = SPARK_DRIVER_MAIN_CLASS,
       driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
@@ -295,7 +175,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
     }
   }
 
-  private def runSparkApplicationAndVerifyCompletion(
+  protected def runSparkApplicationAndVerifyCompletion(
       appResource: String,
       mainClass: String,
       expectedLogOnCompletion: Seq[String],
@@ -347,29 +227,30 @@ private[spark] class KubernetesSuite extends SparkFunSuite
     }
   }
 
-  private def doBasicDriverPodCheck(driverPod: Pod): Unit = {
+  protected def doBasicDriverPodCheck(driverPod: Pod): Unit = {
     assert(driverPod.getMetadata.getName === driverPodName)
     assert(driverPod.getSpec.getContainers.get(0).getImage === image)
     assert(driverPod.getSpec.getContainers.get(0).getName === 
"spark-kubernetes-driver")
   }
 
-  private def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {
+
+  protected def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {
     assert(driverPod.getMetadata.getName === driverPodName)
     assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage)
     assert(driverPod.getSpec.getContainers.get(0).getName === 
"spark-kubernetes-driver")
   }
 
-  private def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
+  protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
     assert(executorPod.getSpec.getContainers.get(0).getImage === image)
     assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
   }
 
-  private def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
+  protected def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
     assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage)
     assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
   }
 
-  private def checkCustomSettings(pod: Pod): Unit = {
+  protected def checkCustomSettings(pod: Pod): Unit = {
     assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
     assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
     assert(pod.getMetadata.getAnnotations.get("annotation1") === 
"annotation1-value")
@@ -399,26 +280,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite
 }
 
 private[spark] object KubernetesSuite {
-
-  val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
-  val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
+  val k8sTestTag = Tag("k8s")
   val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
   val SPARK_REMOTE_MAIN_CLASS: String = 
"org.apache.spark.examples.SparkRemoteFileTest"
   val SPARK_DRIVER_MAIN_CLASS: String = 
"org.apache.spark.examples.DriverSubmissionTest"
-  val SPARK_PAGE_RANK_MAIN_CLASS: String = 
"org.apache.spark.examples.SparkPageRank"
-  val CONTAINER_LOCAL_PYSPARK: String = 
"local:///opt/spark/examples/src/main/python/"
-  val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py"
-  val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py"
-  val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + 
"py_container_checks.py"
-
-  val TEST_SECRET_NAME_PREFIX = "test-secret-"
-  val TEST_SECRET_KEY = "test-key"
-  val TEST_SECRET_VALUE = "test-data"
-  val TEST_SECRET_MOUNT_PATH = "/etc/secrets"
-
-  val REMOTE_PAGE_RANK_DATA_FILE =
-    
"https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt";
-  val REMOTE_PAGE_RANK_FILE_NAME = "pagerank_data.txt"
-
-  case object ShuffleNotReadyException extends Exception
+  val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
+  val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/20ce1a8f/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala
new file mode 100644
index 0000000..0254cc9
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import 
org.apache.spark.deploy.k8s.integrationtest.TestConfig.{getTestImageRepo, 
getTestImageTag}
+
+private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
+
+  import PythonTestsSuite._
+  import KubernetesSuite.k8sTestTag
+
+  test("Run PySpark on simple pi.py example", k8sTestTag) {
+    sparkAppConf
+      .set("spark.kubernetes.container.image", 
s"${getTestImageRepo}/spark-py:${getTestImageTag}")
+    runSparkApplicationAndVerifyCompletion(
+      appResource = PYSPARK_PI,
+      mainClass = "",
+      expectedLogOnCompletion = Seq("Pi is roughly 3"),
+      appArgs = Array("5"),
+      driverPodChecker = doBasicDriverPyPodCheck,
+      executorPodChecker = doBasicExecutorPyPodCheck,
+      appLocator = appLocator,
+      isJVM = false)
+  }
+
+  test("Run PySpark with Python2 to test a pyfiles example", k8sTestTag) {
+    sparkAppConf
+      .set("spark.kubernetes.container.image", 
s"${getTestImageRepo}/spark-py:${getTestImageTag}")
+      .set("spark.kubernetes.pyspark.pythonversion", "2")
+    runSparkApplicationAndVerifyCompletion(
+      appResource = PYSPARK_FILES,
+      mainClass = "",
+      expectedLogOnCompletion = Seq(
+        "Python runtime version check is: True",
+        "Python environment version check is: True"),
+      appArgs = Array("python"),
+      driverPodChecker = doBasicDriverPyPodCheck,
+      executorPodChecker = doBasicExecutorPyPodCheck,
+      appLocator = appLocator,
+      isJVM = false,
+      pyFiles = Some(PYSPARK_CONTAINER_TESTS))
+  }
+
+  test("Run PySpark with Python3 to test a pyfiles example", k8sTestTag) {
+    sparkAppConf
+      .set("spark.kubernetes.container.image", 
s"${getTestImageRepo}/spark-py:${getTestImageTag}")
+      .set("spark.kubernetes.pyspark.pythonversion", "3")
+    runSparkApplicationAndVerifyCompletion(
+      appResource = PYSPARK_FILES,
+      mainClass = "",
+      expectedLogOnCompletion = Seq(
+        "Python runtime version check is: True",
+        "Python environment version check is: True"),
+      appArgs = Array("python3"),
+      driverPodChecker = doBasicDriverPyPodCheck,
+      executorPodChecker = doBasicExecutorPyPodCheck,
+      appLocator = appLocator,
+      isJVM = false,
+      pyFiles = Some(PYSPARK_CONTAINER_TESTS))
+  }
+}
+
+private[spark] object PythonTestsSuite {
+  val CONTAINER_LOCAL_PYSPARK: String = 
"local:///opt/spark/examples/src/main/python/"
+  val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py"
+  val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py"
+  val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + 
"py_container_checks.py"
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/20ce1a8f/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
new file mode 100644
index 0000000..7b05c13
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{Pod, Secret, SecretBuilder}
+import org.apache.commons.codec.binary.Base64
+import org.apache.commons.io.output.ByteArrayOutputStream
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+
+private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite =>
+
+  import SecretsTestsSuite._
+
+  private def createTestSecret(): Unit = {
+    val sb = new SecretBuilder()
+    sb.withNewMetadata()
+      .withName(ENV_SECRET_NAME)
+      .endMetadata()
+    val secUsername = Base64.encodeBase64String(ENV_SECRET_VALUE_1.getBytes())
+    val secPassword = Base64.encodeBase64String(ENV_SECRET_VALUE_2.getBytes())
+    val envSecretData = Map(ENV_SECRET_KEY_1 -> secUsername, ENV_SECRET_KEY_2 
-> secPassword)
+    sb.addToData(envSecretData.asJava)
+    val envSecret = sb.build()
+    val sec = kubernetesTestComponents
+      .kubernetesClient
+      .secrets()
+      .createOrReplace(envSecret)
+  }
+
+  private def deleteTestSecret(): Unit = {
+    kubernetesTestComponents
+      .kubernetesClient
+      .secrets()
+      .withName(ENV_SECRET_NAME)
+      .delete()
+  }
+
+  test("Run SparkPi with env and mount secrets.", k8sTestTag) {
+    createTestSecret()
+    sparkAppConf
+      .set(s"spark.kubernetes.driver.secrets.$ENV_SECRET_NAME", 
SECRET_MOUNT_PATH)
+      .set(s"spark.kubernetes.driver.secretKeyRef.USERNAME", 
s"$ENV_SECRET_NAME:username")
+      .set(s"spark.kubernetes.driver.secretKeyRef.PASSWORD", 
s"$ENV_SECRET_NAME:password")
+      .set(s"spark.kubernetes.executor.secrets.$ENV_SECRET_NAME", 
SECRET_MOUNT_PATH)
+      .set(s"spark.kubernetes.executor.secretKeyRef.USERNAME", 
s"$ENV_SECRET_NAME:username")
+      .set(s"spark.kubernetes.executor.secretKeyRef.PASSWORD", 
s"$ENV_SECRET_NAME:password")
+    try {
+      runSparkPiAndVerifyCompletion(
+        driverPodChecker = (driverPod: Pod) => {
+          doBasicDriverPodCheck(driverPod)
+          checkSecrets(driverPod)
+        },
+        executorPodChecker = (executorPod: Pod) => {
+          doBasicExecutorPodCheck(executorPod)
+          checkSecrets(executorPod)
+        },
+        appArgs = Array("1000") // give it enough time for all execs to be 
visible
+      )
+    } finally {
+      // make sure this always run
+      deleteTestSecret()
+    }
+  }
+
+  private def checkSecrets(pod: Pod): Unit = {
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+      implicit val podName: String = pod.getMetadata.getName
+      val env = executeCommand("env")
+      assert(env.toString.contains(ENV_SECRET_VALUE_1))
+      assert(env.toString.contains(ENV_SECRET_VALUE_2))
+      val fileUsernameContents = executeCommand("cat", 
s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
+      val filePasswordContents = executeCommand("cat", 
s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
+      assert(fileUsernameContents.toString.trim.equals(ENV_SECRET_VALUE_1))
+      assert(filePasswordContents.toString.trim.equals(ENV_SECRET_VALUE_2))
+    }
+  }
+
+  private def executeCommand(cmd: String*)(implicit podName: String): String = 
{
+    val out = new ByteArrayOutputStream()
+    val watch = kubernetesTestComponents
+      .kubernetesClient
+      .pods()
+      .withName(podName)
+      .readingInput(System.in)
+      .writingOutput(out)
+      .writingError(System.err)
+      .withTTY()
+      .exec(cmd.toArray: _*)
+    // wait to get some result back
+    Thread.sleep(1000)
+    watch.close()
+    out.flush()
+    out.toString()
+  }
+}
+
+private[spark] object SecretsTestsSuite {
+  val ENV_SECRET_NAME = "mysecret"
+  val SECRET_MOUNT_PATH = "/etc/secret"
+  val ENV_SECRET_KEY_1 = "username"
+  val ENV_SECRET_KEY_2 = "password"
+  val ENV_SECRET_VALUE_1 = "secretusername"
+  val ENV_SECRET_VALUE_2 = "secretpassword"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/20ce1a8f/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConfig.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConfig.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConfig.scala
new file mode 100644
index 0000000..5a49e07
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConfig.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.io.File
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+
+object TestConfig {
+  def getTestImageTag: String = {
+    val imageTagFileProp = 
System.getProperty("spark.kubernetes.test.imageTagFile")
+    require(imageTagFileProp != null, "Image tag file must be provided in 
system properties.")
+    val imageTagFile = new File(imageTagFileProp)
+    require(imageTagFile.isFile, s"No file found for image tag at 
${imageTagFile.getAbsolutePath}.")
+    Files.toString(imageTagFile, Charsets.UTF_8).trim
+  }
+
+  def getTestImageRepo: String = {
+    val imageRepo = System.getProperty("spark.kubernetes.test.imageRepo")
+    require(imageRepo != null, "Image repo must be provided in system 
properties.")
+    imageRepo
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/20ce1a8f/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConstants.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConstants.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConstants.scala
new file mode 100644
index 0000000..8595d0e
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConstants.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+object TestConstants {
+  val MINIKUBE_TEST_BACKEND = "minikube"
+  val GCE_TEST_BACKEND = "gce"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/20ce1a8f/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala
deleted file mode 100644
index a81ef45..0000000
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.integrationtest
-
-import java.io.File
-
-import com.google.common.base.Charsets
-import com.google.common.io.Files
-
-package object config {
-  def getTestImageTag: String = {
-    val imageTagFileProp = 
System.getProperty("spark.kubernetes.test.imageTagFile")
-    require(imageTagFileProp != null, "Image tag file must be provided in 
system properties.")
-    val imageTagFile = new File(imageTagFileProp)
-    require(imageTagFile.isFile, s"No file found for image tag at 
${imageTagFile.getAbsolutePath}.")
-    Files.toString(imageTagFile, Charsets.UTF_8).trim
-  }
-
-  def getTestImageRepo: String = {
-    val imageRepo = System.getProperty("spark.kubernetes.test.imageRepo")
-    require(imageRepo != null, "Image repo must be provided in system 
properties.")
-    imageRepo
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/20ce1a8f/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala
deleted file mode 100644
index 0807a68..0000000
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.integrationtest
-
-package object constants {
-  val MINIKUBE_TEST_BACKEND = "minikube"
-  val GCE_TEST_BACKEND = "gce"
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to