Repository: spark
Updated Branches:
  refs/heads/branch-1.4 306837e4e -> 7e46ea022


[SPARK-7989] [CORE] [TESTS] Fix flaky tests in ExternalShuffleServiceSuite and 
SparkListenerWithClusterSuite

The flaky tests in ExternalShuffleServiceSuite and 
SparkListenerWithClusterSuite will fail if there are not enough executors up 
before running the jobs.

This PR adds `JobProgressListener.waitUntilExecutorsUp`. The tests for the 
cluster mode can use it to wait until the expected executors are up.

Author: zsxwing <[email protected]>

Closes #6546 from zsxwing/SPARK-7989 and squashes the following commits:

5560e09 [zsxwing] Fix a typo
3b69840 [zsxwing] Fix flaky tests in ExternalShuffleServiceSuite and 
SparkListenerWithClusterSuite

(cherry picked from commit f27134782ebb61c360330e2d6d5bb1aa02be3fb6)
Signed-off-by: Andrew Or <[email protected]>

Conflicts:
        core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
        
core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e46ea02
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e46ea02
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e46ea02

Branch: refs/heads/branch-1.4
Commit: 7e46ea0228f142f6b384331d62cec8f86e61c9d1
Parents: 306837e
Author: zsxwing <[email protected]>
Authored: Wed Jun 3 15:04:20 2015 -0700
Committer: Andrew Or <[email protected]>
Committed: Wed Jun 3 15:05:49 2015 -0700

----------------------------------------------------------------------
 .../spark/ui/jobs/JobProgressListener.scala     | 30 ++++++++++++++++++++
 .../spark/ExternalShuffleServiceSuite.scala     |  8 ++++++
 .../apache/spark/broadcast/BroadcastSuite.scala |  9 +-----
 .../SparkListenerWithClusterSuite.scala         | 10 +++++--
 4 files changed, 46 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7e46ea02/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index f39e961..1d31fce 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -17,8 +17,12 @@
 
 package org.apache.spark.ui.jobs
 
+import java.util.concurrent.TimeoutException
+
 import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
 
+import com.google.common.annotations.VisibleForTesting
+
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
@@ -526,4 +530,30 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
   override def onApplicationStart(appStarted: SparkListenerApplicationStart) {
     startTime = appStarted.time
   }
+
+  /**
+   * For testing only. Wait until at least `numExecutors` executors are up, or 
throw
+   * `TimeoutException` if the waiting time elapsed before `numExecutors` 
executors up.
+   *
+   * @param numExecutors the number of executors to wait at least
+   * @param timeout time to wait in milliseconds
+   */
+  @VisibleForTesting
+  private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): 
Unit = {
+    val finishTime = System.currentTimeMillis() + timeout
+    while (System.currentTimeMillis() < finishTime) {
+      val numBlockManagers = synchronized {
+        blockManagerIds.size
+      }
+      if (numBlockManagers >= numExecutors + 1) {
+        // Need to count the block manager in driver
+        return
+      }
+      // Sleep rather than using wait/notify, because this is used only for 
testing and wait/notify
+      // add overhead in the general case.
+      Thread.sleep(10)
+    }
+    throw new TimeoutException(
+      s"Can't find $numExecutors executors before $timeout milliseconds 
elapsed")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7e46ea02/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala 
b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index bac6fdb..5b127a0 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -55,6 +55,14 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with 
BeforeAndAfterAll {
     sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
     sc.env.blockManager.shuffleClient.getClass should 
equal(classOf[ExternalShuffleClient])
 
+    // In a slow machine, one slave may register hundreds of milliseconds 
ahead of the other one.
+    // If we don't wait for all salves, it's possible that only one executor 
runs all jobs. Then
+    // all shuffle blocks will be in this executor, 
ShuffleBlockFetcherIterator will directly fetch
+    // local blocks from the local BlockManager and won't send requests to 
ExternalShuffleService.
+    // In this case, we won't receive FetchFailed. And it will make this test 
fail.
+    // Therefore, we should wait until all salves are up
+    sc.jobProgressListener.waitUntilExecutorsUp(2, 10000)
+
     val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ 
+ _)
 
     rdd.count()

http://git-wip-us.apache.org/repos/asf/spark/blob/7e46ea02/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala 
b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index c38e306..4c85857 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.broadcast
 
-import scala.concurrent.duration._
 import scala.util.Random
 
 import org.scalatest.{Assertions, FunSuite}
@@ -312,13 +311,7 @@ class BroadcastSuite extends FunSuite with 
LocalSparkContext {
       val _sc =
         new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), 
"test", broadcastConf)
       // Wait until all salves are up
-      eventually(timeout(10.seconds), interval(10.milliseconds)) {
-        _sc.jobProgressListener.synchronized {
-          val numBlockManagers = _sc.jobProgressListener.blockManagerIds.size
-          assert(numBlockManagers == numSlaves + 1,
-            s"Expect ${numSlaves + 1} block managers, but was 
${numBlockManagers}")
-        }
-      }
+      _sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000)
       _sc
     } else {
       new SparkContext("local", "test", broadcastConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/7e46ea02/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
index 10c43b8..2ce1b5b 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
@@ -17,12 +17,12 @@
 
 package org.apache.spark.scheduler
 
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.{SparkContext, LocalSparkContext}
+import scala.collection.mutable
 
 import org.scalatest.{FunSuite, BeforeAndAfter, BeforeAndAfterAll}
 
-import scala.collection.mutable
+import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
 
 /**
  * Unit tests for SparkListener that require a local cluster.
@@ -41,6 +41,10 @@ class SparkListenerWithClusterSuite extends FunSuite with 
LocalSparkContext
     val listener = new SaveExecutorInfo
     sc.addSparkListener(listener)
 
+    // This test will check if the number of executors received by 
"SparkListener" is same as the
+    // number of all executors, so we need to wait until all executors are up
+    sc.jobProgressListener.waitUntilExecutorsUp(2, 10000)
+
     val rdd1 = sc.parallelize(1 to 100, 4)
     val rdd2 = rdd1.map(_.toString)
     rdd2.setName("Target RDD")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to