This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2825237  [SPARK-31062][K8S][TESTS] Improve spark decommissioning k8s 
test reliability
2825237 is described below

commit 28252374482cd212bc9f4c15b9ee4b875d0e7caf
Author: Holden Karau <[email protected]>
AuthorDate: Wed Mar 11 14:42:31 2020 -0700

    [SPARK-31062][K8S][TESTS] Improve spark decommissioning k8s test reliability
    
    ### What changes were proposed in this pull request?
    
    Replace a sleep with waiting for the first collect to happen to try and 
make the K8s test code more reliable.
    
    ### Why are the changes needed?
    
    Currently the Decommissioning test appears to be flaky in Jenkins.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Ran K8s test suite in a loop on minikube on my desktop for 10 iterations 
without this test failing on any of the runs.
    
    Closes #27858 from 
holdenk/SPARK-31062-Improve-Spark-Decommissioning-K8s-test-teliability.
    
    Authored-by: Holden Karau <[email protected]>
    Signed-off-by: Holden Karau <[email protected]>
---
 .../k8s/integrationtest/DecommissionSuite.scala    |  5 +--
 .../k8s/integrationtest/KubernetesSuite.scala      | 40 +++++++++-------------
 2 files changed, 20 insertions(+), 25 deletions(-)

diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
index f5eab6e..becf941 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -32,8 +32,9 @@ private[spark] trait DecommissionSuite { k8sSuite: 
KubernetesSuite =>
     runSparkApplicationAndVerifyCompletion(
       appResource = PYSPARK_DECOMISSIONING,
       mainClass = "",
-      expectedLogOnCompletion = Seq("decommissioning executor",
-        "Finished waiting, stopping Spark"),
+      expectedLogOnCompletion = Seq(
+        "Finished waiting, stopping Spark",
+        "decommissioning executor"),
       appArgs = Array.empty[String],
       driverPodChecker = doBasicDriverPyPodCheck,
       executorPodChecker = doBasicExecutorPyPodCheck,
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 61e1f27..eaaf67d 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -298,6 +298,7 @@ class KubernetesSuite extends SparkFunSuite
         .headOption.getOrElse(false)
       result
     }
+
     val execWatcher = kubernetesTestComponents.kubernetesClient
       .pods()
       .withLabel("spark-app-locator", appLocator)
@@ -316,17 +317,25 @@ class KubernetesSuite extends SparkFunSuite
               logDebug(s"Add event received for $name.")
               execPods(name) = resource
               // If testing decommissioning start a thread to simulate
-              // decommissioning.
+              // decommissioning on the first exec pod.
               if (decommissioningTest && execPods.size == 1) {
                 // Wait for all the containers in the pod to be running
-                logDebug("Waiting for first pod to become OK prior to 
deletion")
+                logDebug("Waiting for pod to become OK prior to deletion")
                 Eventually.eventually(patienceTimeout, patienceInterval) {
                   val result = checkPodReady(namespace, name)
                   result shouldBe (true)
                 }
-                // Sleep a small interval to allow execution of job
-                logDebug("Sleeping before killing pod.")
-                Thread.sleep(2000)
+                // Look for the string that indicates we're good to clean up
+                // on the driver
+                logDebug("Waiting for first collect...")
+                Eventually.eventually(TIMEOUT, INTERVAL) {
+                  assert(kubernetesTestComponents.kubernetesClient
+                    .pods()
+                    .withName(driverPodName)
+                    .getLog
+                    .contains("Waiting to give nodes time to finish."),
+                    "Decommission test did not complete first collect.")
+                }
                 // Delete the pod to simulate cluster scale down/migration.
                 val pod = 
kubernetesTestComponents.kubernetesClient.pods().withName(name)
                 pod.delete()
@@ -361,23 +370,6 @@ class KubernetesSuite extends SparkFunSuite
     Eventually.eventually(patienceTimeout, patienceInterval) {
       execPods.values.nonEmpty should be (true)
     }
-    // If decommissioning we need to wait and check the executors were removed
-    if (decommissioningTest) {
-      // Sleep a small interval to ensure everything is registered.
-      Thread.sleep(100)
-      // Wait for the executors to become ready
-      Eventually.eventually(patienceTimeout, patienceInterval) {
-        val anyReadyPods = ! execPods.map{
-          case (name, resource) =>
-            (name, resource.getMetadata().getNamespace())
-        }.filter{
-          case (name, namespace) => checkPodReady(namespace, name)
-        }.isEmpty
-        val podsEmpty = execPods.values.isEmpty
-        val podsReadyOrDead = anyReadyPods || podsEmpty
-        podsReadyOrDead shouldBe (true)
-      }
-    }
     execWatcher.close()
     execPods.values.foreach(executorPodChecker(_))
     Eventually.eventually(patienceTimeout, patienceInterval) {
@@ -386,10 +378,12 @@ class KubernetesSuite extends SparkFunSuite
           .pods()
           .withName(driverPod.getMetadata.getName)
           .getLog
-          .contains(e), "The application did not complete.")
+          .contains(e),
+          s"The application did not complete, did not find str ${e}")
       }
     }
   }
+
   protected def doBasicDriverPodCheck(driverPod: Pod): Unit = {
     assert(driverPod.getMetadata.getName === driverPodName)
     assert(driverPod.getSpec.getContainers.get(0).getImage === image)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to