Repository: spark
Updated Branches:
  refs/heads/branch-2.4 8a2992ed4 -> 67f2cb6e0


[SPARK-25291][K8S] Fixing Flakiness of Executor Pod tests

## What changes were proposed in this pull request?

Added fix to flakiness that was present in PySpark tests w.r.t Executors not 
being tested.

Important fix to executorConf which was failing tests when executors *were* 
tested

## How was this patch tested?

Unit and Integration tests

Closes #22415 from ifilonenko/SPARK-25291.

Authored-by: Ilan Filonenko <[email protected]>
Signed-off-by: Yinan Li <[email protected]>
(cherry picked from commit 123f0041d534f28e14343aafb4e5cec19dde14ad)
Signed-off-by: Yinan Li <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67f2cb6e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67f2cb6e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67f2cb6e

Branch: refs/heads/branch-2.4
Commit: 67f2cb6e0e9a0f19776eb6fca068dd4ca8e3fb24
Parents: 8a2992e
Author: Ilan Filonenko <[email protected]>
Authored: Tue Sep 18 11:43:35 2018 -0700
Committer: Yinan Li <[email protected]>
Committed: Tue Sep 18 11:44:06 2018 -0700

----------------------------------------------------------------------
 .../k8s/integrationtest/KubernetesSuite.scala   | 35 ++++++++++++++------
 .../KubernetesTestComponents.scala              |  1 -
 .../k8s/integrationtest/SecretsTestsSuite.scala |  2 +-
 3 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/67f2cb6e/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 896a83a..ba38f14 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -23,7 +23,10 @@ import java.util.regex.Pattern
 
 import com.google.common.io.PatternFilenameFilter
 import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag}
+import org.scalatest.Matchers
 import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
 import org.scalatest.time.{Minutes, Seconds, Span}
 import scala.collection.JavaConverters._
@@ -31,10 +34,12 @@ import scala.collection.JavaConverters._
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.deploy.k8s.integrationtest.TestConfig._
 import 
org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, 
IntegrationTestBackendFactory}
+import org.apache.spark.internal.Logging
 
 private[spark] class KubernetesSuite extends SparkFunSuite
   with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with 
SecretsTestsSuite
-  with PythonTestsSuite with ClientModeTestsSuite {
+  with PythonTestsSuite with ClientModeTestsSuite
+  with Logging with Eventually with Matchers {
 
   import KubernetesSuite._
 
@@ -207,17 +212,28 @@ private[spark] class KubernetesSuite extends SparkFunSuite
       .getItems
       .get(0)
     driverPodChecker(driverPod)
-
-    val executorPods = kubernetesTestComponents.kubernetesClient
+    val execPods = scala.collection.mutable.Map[String, Pod]()
+    val execWatcher = kubernetesTestComponents.kubernetesClient
       .pods()
       .withLabel("spark-app-locator", appLocator)
       .withLabel("spark-role", "executor")
-      .list()
-      .getItems
-    executorPods.asScala.foreach { pod =>
-      executorPodChecker(pod)
-    }
-
+      .watch(new Watcher[Pod] {
+        logInfo("Beginning watch of executors")
+        override def onClose(cause: KubernetesClientException): Unit =
+          logInfo("Ending watch of executors")
+        override def eventReceived(action: Watcher.Action, resource: Pod): 
Unit = {
+          val name = resource.getMetadata.getName
+          action match {
+            case Action.ADDED | Action.MODIFIED =>
+              execPods(name) = resource
+            case Action.DELETED | Action.ERROR =>
+              execPods.remove(name)
+          }
+        }
+      })
+    Eventually.eventually(TIMEOUT, INTERVAL) { execPods.values.nonEmpty should 
be (true) }
+    execWatcher.close()
+    execPods.values.foreach(executorPodChecker(_))
     Eventually.eventually(TIMEOUT, INTERVAL) {
       expectedLogOnCompletion.foreach { e =>
         assert(kubernetesTestComponents.kubernetesClient
@@ -228,7 +244,6 @@ private[spark] class KubernetesSuite extends SparkFunSuite
       }
     }
   }
-
   protected def doBasicDriverPodCheck(driverPod: Pod): Unit = {
     assert(driverPod.getMetadata.getName === driverPodName)
     assert(driverPod.getSpec.getContainers.get(0).getImage === image)

http://git-wip-us.apache.org/repos/asf/spark/blob/67f2cb6e/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index b602fdf..5615d61 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -62,7 +62,6 @@ private[spark] class KubernetesTestComponents(defaultClient: 
DefaultKubernetesCl
     new SparkAppConf()
       .set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}")
       .set("spark.kubernetes.namespace", namespace)
-      .set("spark.executor.memory", "500m")
       .set("spark.executor.cores", "1")
       .set("spark.executors.instances", "1")
       .set("spark.app.name", "spark-test-app")

http://git-wip-us.apache.org/repos/asf/spark/blob/67f2cb6e/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
index 7b05c13..b18a6ae 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.integrationtest
 
 import scala.collection.JavaConverters._
 
-import io.fabric8.kubernetes.api.model.{Pod, Secret, SecretBuilder}
+import io.fabric8.kubernetes.api.model.{Pod, SecretBuilder}
 import org.apache.commons.codec.binary.Base64
 import org.apache.commons.io.output.ByteArrayOutputStream
 import org.scalatest.concurrent.Eventually


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to