This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 53221cd  [SPARK-31244][K8S][TEST] Use Minio instead of Ceph in K8S 
DepsTestsSuite
53221cd is described below

commit 53221cda408e9be5d0d2ff5946c200cb43647dd9
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Wed Mar 25 12:38:15 2020 -0700

    [SPARK-31244][K8S][TEST] Use Minio instead of Ceph in K8S DepsTestsSuite
    
    ### What changes were proposed in this pull request?
    
    This PR (SPARK-31244) replaces `Ceph` with `Minio` in K8S `DepsTestSuite`.
    
    ### Why are the changes needed?
    
    Currently, `DepsTestsSuite` is using `ceph` for S3 storage. However, the 
used version and all new releases are broken on new `minikube` releases. We had 
better use more robust and small one.
    
    ```
    $ minikube version
    minikube version: v1.8.2
    
    $ minikube -p minikube docker-env | source
    
    $ docker run -it --rm -e NETWORK_AUTO_DETECT=4 -e RGW_FRONTEND_PORT=8000 -e 
SREE_PORT=5001 -e CEPH_DEMO_UID=nano -e CEPH_DAEMON=demo 
ceph/daemon:v4.0.3-stable-4.0-nautilus-centos-7-x86_64 /bin/sh
    2020-03-25 04:26:21  /opt/ceph-container/bin/entrypoint.sh: ERROR- it looks 
like we have not been able to discover the network settings
    
    $ docker run -it --rm -e NETWORK_AUTO_DETECT=4 -e RGW_FRONTEND_PORT=8000 -e 
SREE_PORT=5001 -e CEPH_DEMO_UID=nano -e CEPH_DAEMON=demo 
ceph/daemon:v4.0.11-stable-4.0-nautilus-centos-7 /bin/sh
    2020-03-25 04:20:30  /opt/ceph-container/bin/entrypoint.sh: ERROR- it looks 
like we have not been able to discover the network settings
    ```
    
    Also, the image size is unnecessarily big (almost `1GB`) and growing while 
`minio` is `55.8MB` with the same features.
    ```
    $ docker images | grep ceph
    ceph/daemon v4.0.3-stable-4.0-nautilus-centos-7-x86_64 a6a05ccdf924 6 
months ago 852MB
    ceph/daemon v4.0.11-stable-4.0-nautilus-centos-7       87f695550d8e 12 
hours ago 901MB
    
    $ docker images | grep minio
    minio/minio latest                                     95c226551ea6 5 days 
ago   55.8MB
    ```
    
    ### Does this PR introduce any user-facing change?
    
    No. (This is a test case change)
    
    ### How was this patch tested?
    
    Pass the existing Jenkins K8s integration test job and test with the latest 
minikube.
    ```
    $ minikube version
    minikube version: v1.8.2
    
    $ kubectl version --short
    Client Version: v1.17.4
    Server Version: v1.17.4
    
    $ NO_MANUAL=1 ./dev/make-distribution.sh --r --pip --tgz -Pkubernetes
    $ 
resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh 
--spark-tgz $PWD/spark-*.tgz
    ...
    KubernetesSuite:
    - Run SparkPi with no resources
    - Run SparkPi with a very long application name.
    - Use SparkLauncher.NO_RESOURCE
    - Run SparkPi with a master URL without a scheme.
    - Run SparkPi with an argument.
    - Run SparkPi with custom labels, annotations, and environment variables.
    - All pods have the same service account by default
    - Run extraJVMOptions check on driver
    - Run SparkRemoteFileTest using a remote data file
    - Run SparkPi with env and mount secrets.
    - Run PySpark on simple pi.py example
    - Run PySpark with Python2 to test a pyfiles example
    - Run PySpark with Python3 to test a pyfiles example
    - Run PySpark with memory customization
    - Run in client mode.
    - Start pod creation from template
    - PVs with local storage *** FAILED *** // This is irrelevant to this PR.
    - Launcher client dependencies          // This is the fixed test case by 
this PR.
    - Test basic decommissioning
    - Run SparkR on simple dataframe.R example
    Run completed in 12 minutes, 4 seconds.
    ...
    ```
    
    The following is the working snapshot of `DepsTestSuite` test.
    ```
    $ kubectl get all -ncf9438dd8a65436686b1196a6b73000f
    NAME                                                  READY   STATUS    
RESTARTS   AGE
    pod/minio-0                                           1/1     Running   0   
       70s
    pod/spark-test-app-8494bddca3754390b9e59a2ef47584eb   1/1     Running   0   
       55s
    
    NAME                                                 TYPE        CLUSTER-IP 
     EXTERNAL-IP   PORT(S)                      AGE
    service/minio-s3                                     NodePort    
10.109.54.180   <none>        9000:30678/TCP               70s
    service/spark-test-app-fd916b711061c7b8-driver-svc   ClusterIP   None       
     <none>        7078/TCP,7079/TCP,4040/TCP   55s
    
    NAME                     READY   AGE
    statefulset.apps/minio   1/1     70s
    ```
    
    Closes #28015 from dongjoon-hyun/SPARK-31244.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit f206bbde3a8f64650236013d61680faba492d7a4)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../k8s/integrationtest/DepsTestsSuite.scala       | 90 +++++++++-------------
 1 file changed, 36 insertions(+), 54 deletions(-)

diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
index 4141268..c35aa5c 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
@@ -35,17 +35,16 @@ import 
org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube
 private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
   import KubernetesSuite.k8sTestTag
 
-  val cName = "ceph-nano"
+  val cName = "minio"
   val svcName = s"$cName-s3"
-  val bucket = "spark"
-
-  private def getCephContainer(): Container = {
-    val envVars = Map ( "NETWORK_AUTO_DETECT" -> "4",
-      "RGW_FRONTEND_PORT" -> "8000",
-      "SREE_PORT" -> "5001",
-      "CEPH_DEMO_UID" -> "nano",
-      "CEPH_DAEMON" -> "demo",
-      "DEBUG" -> "verbose"
+  val BUCKET = "spark"
+  val ACCESS_KEY = "minio"
+  val SECRET_KEY = "miniostorage"
+
+  private def getMinioContainer(): Container = {
+    val envVars = Map (
+      "MINIO_ACCESS_KEY" -> ACCESS_KEY,
+      "MINIO_SECRET_KEY" -> SECRET_KEY
     ).map( envV =>
       new EnvVarBuilder()
         .withName(envV._1)
@@ -63,13 +62,14 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
     ).asJava
 
     new ContainerBuilder()
-      .withImage("ceph/daemon:v4.0.3-stable-4.0-nautilus-centos-7-x86_64")
+      .withImage("minio/minio:latest")
       .withImagePullPolicy("Always")
       .withName(cName)
+      .withArgs("server", "/data")
       .withPorts(new ContainerPortBuilder()
           .withName(svcName)
           .withProtocol("TCP")
-          .withContainerPort(8000)
+          .withContainerPort(9000)
         .build()
       )
       .withResources(new ResourceRequirementsBuilder()
@@ -81,10 +81,9 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
       .build()
   }
 
-  // Based on https://github.com/ceph/cn
-  private def setupCephStorage(): Unit = {
-    val labels = Map("app" -> "ceph", "daemon" -> "nano").asJava
-    val cephService = new ServiceBuilder()
+  private def setupMinioStorage(): Unit = {
+    val labels = Map("app" -> "minio").asJava
+    val minioService = new ServiceBuilder()
       .withNewMetadata()
         .withName(svcName)
       .withLabels(labels)
@@ -92,9 +91,9 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
       .withNewSpec()
         .withPorts(new ServicePortBuilder()
           .withName("https")
-          .withPort(8000)
+          .withPort(9000)
           .withProtocol("TCP")
-          .withTargetPort(new IntOrString(8000))
+          .withTargetPort(new IntOrString(9000))
           .build()
         )
         .withType("NodePort")
@@ -102,7 +101,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
       .endSpec()
       .build()
 
-    val cephStatefulSet = new StatefulSetBuilder()
+    val minioStatefulSet = new StatefulSetBuilder()
       .withNewMetadata()
         .withName(cName)
         .withLabels(labels)
@@ -110,7 +109,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
       .withNewSpec()
         .withReplicas(1)
         .withNewSelector()
-          .withMatchLabels(Map("app" -> "ceph").asJava)
+          .withMatchLabels(Map("app" -> "minio").asJava)
         .endSelector()
         .withServiceName(cName)
         .withNewTemplate()
@@ -119,7 +118,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
             .withLabels(labels)
            .endMetadata()
           .withNewSpec()
-            .withContainers(getCephContainer())
+            .withContainers(getMinioContainer())
           .endSpec()
         .endTemplate()
       .endSpec()
@@ -128,16 +127,16 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
     kubernetesTestComponents
       .kubernetesClient
       .services()
-      .create(cephService)
+      .create(minioService)
 
     kubernetesTestComponents
       .kubernetesClient
       .apps()
       .statefulSets()
-      .create(cephStatefulSet)
+      .create(minioStatefulSet)
   }
 
- private def deleteCephStorage(): Unit = {
+ private def deleteMinioStorage(): Unit = {
     kubernetesTestComponents
       .kubernetesClient
       .apps()
@@ -155,47 +154,30 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
   test("Launcher client dependencies", k8sTestTag, MinikubeTag) {
     val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
     try {
-      setupCephStorage()
-      val cephUrlStr = getServiceUrl(svcName)
-      val cephUrl = new URL(cephUrlStr)
-      val cephHost = cephUrl.getHost
-      val cephPort = cephUrl.getPort
+      setupMinioStorage()
+      val minioUrlStr = getServiceUrl(svcName)
+      val minioUrl = new URL(minioUrlStr)
+      val minioHost = minioUrl.getHost
+      val minioPort = minioUrl.getPort
       val examplesJar = Utils.getExamplesJarAbsolutePath(sparkHomeDir)
-      val (accessKey, secretKey) = getCephCredentials()
       sparkAppConf
-        .set("spark.hadoop.fs.s3a.access.key", accessKey)
-        .set("spark.hadoop.fs.s3a.secret.key", secretKey)
+        .set("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)
+        .set("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)
         .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
-        .set("spark.hadoop.fs.s3a.endpoint", s"$cephHost:$cephPort")
-        .set("spark.kubernetes.file.upload.path", s"s3a://$bucket")
+        .set("spark.hadoop.fs.s3a.endpoint", s"$minioHost:$minioPort")
+        .set("spark.kubernetes.file.upload.path", s"s3a://$BUCKET")
         .set("spark.files", s"$HOST_PATH/$fileName")
         .set("spark.hadoop.fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem")
         .set("spark.jars.packages", "com.amazonaws:aws-java-sdk:" +
           "1.7.4,org.apache.hadoop:hadoop-aws:2.7.6")
         .set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp 
-Divy.home=/tmp")
-      createS3Bucket(accessKey, secretKey, cephUrlStr)
+      createS3Bucket(ACCESS_KEY, SECRET_KEY, minioUrlStr)
       runSparkRemoteCheckAndVerifyCompletion(appResource = examplesJar,
         appArgs = Array(fileName),
         timeout = Option(DEPS_TIMEOUT))
     } finally {
       // make sure this always runs
-      deleteCephStorage()
-    }
-  }
-
-  // There isn't a cleaner way to get the credentials
-  // when ceph-nano runs on k8s
-  private def getCephCredentials(): (String, String) = {
-    Eventually.eventually(TIMEOUT, INTERVAL) {
-      val cephPod = kubernetesTestComponents
-        .kubernetesClient
-        .pods()
-        .withName(s"$cName-0")
-        .get()
-      implicit val podName: String = cephPod.getMetadata.getName
-      implicit val components: KubernetesTestComponents = 
kubernetesTestComponents
-      val contents = Utils.executeCommand("cat", "/nano_user_details")
-    (extractS3Key(contents, "access_key"), extractS3Key(contents, 
"secret_key"))
+      deleteMinioStorage()
     }
   }
 
@@ -215,10 +197,10 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
         val credentials = new BasicAWSCredentials(accessKey, secretKey)
         val s3client = new AmazonS3Client(credentials)
         s3client.setEndpoint(endPoint)
-        s3client.createBucket(bucket)
+        s3client.createBucket(BUCKET)
       } catch {
         case e: Exception =>
-          throw new SparkException(s"Failed to create bucket $bucket.", e)
+          throw new SparkException(s"Failed to create bucket $BUCKET.", e)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to