Repository: spark Updated Branches: refs/heads/master 1d8669f15 -> f27134782
[SPARK-7989] [CORE] [TESTS] Fix flaky tests in ExternalShuffleServiceSuite and SparkListenerWithClusterSuite The flaky tests in ExternalShuffleServiceSuite and SparkListenerWithClusterSuite will fail if there are not enough executors up before running the jobs. This PR adds `JobProgressListener.waitUntilExecutorsUp`. The tests for the cluster mode can use it to wait until the expected executors are up. Author: zsxwing <[email protected]> Closes #6546 from zsxwing/SPARK-7989 and squashes the following commits: 5560e09 [zsxwing] Fix a typo 3b69840 [zsxwing] Fix flaky tests in ExternalShuffleServiceSuite and SparkListenerWithClusterSuite Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2713478 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2713478 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2713478 Branch: refs/heads/master Commit: f27134782ebb61c360330e2d6d5bb1aa02be3fb6 Parents: 1d8669f Author: zsxwing <[email protected]> Authored: Wed Jun 3 15:04:20 2015 -0700 Committer: Andrew Or <[email protected]> Committed: Wed Jun 3 15:04:20 2015 -0700 ---------------------------------------------------------------------- .../spark/ui/jobs/JobProgressListener.scala | 30 ++++++++++++++++++++ .../spark/ExternalShuffleServiceSuite.scala | 8 ++++++ .../apache/spark/broadcast/BroadcastSuite.scala | 10 +------ .../SparkListenerWithClusterSuite.scala | 10 +++++-- 4 files changed, 46 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f2713478/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index f39e961..1d31fce 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -17,8 +17,12 @@ package org.apache.spark.ui.jobs +import java.util.concurrent.TimeoutException + import scala.collection.mutable.{HashMap, HashSet, ListBuffer} +import com.google.common.annotations.VisibleForTesting + import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics @@ -526,4 +530,30 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onApplicationStart(appStarted: SparkListenerApplicationStart) { startTime = appStarted.time } + + /** + * For testing only. Wait until at least `numExecutors` executors are up, or throw + * `TimeoutException` if the waiting time elapsed before `numExecutors` executors up. + * + * @param numExecutors the number of executors to wait at least + * @param timeout time to wait in milliseconds + */ + @VisibleForTesting + private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): Unit = { + val finishTime = System.currentTimeMillis() + timeout + while (System.currentTimeMillis() < finishTime) { + val numBlockManagers = synchronized { + blockManagerIds.size + } + if (numBlockManagers >= numExecutors + 1) { + // Need to count the block manager in driver + return + } + // Sleep rather than using wait/notify, because this is used only for testing and wait/notify + // add overhead in the general case. + Thread.sleep(10) + } + throw new TimeoutException( + s"Can't find $numExecutors executors before $timeout milliseconds elapsed") + } } http://git-wip-us.apache.org/repos/asf/spark/blob/f2713478/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index bac6fdb..5b127a0 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -55,6 +55,14 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll { sc.env.blockManager.externalShuffleServiceEnabled should equal(true) sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient]) + // In a slow machine, one slave may register hundreds of milliseconds ahead of the other one. + // If we don't wait for all salves, it's possible that only one executor runs all jobs. Then + // all shuffle blocks will be in this executor, ShuffleBlockFetcherIterator will directly fetch + // local blocks from the local BlockManager and won't send requests to ExternalShuffleService. + // In this case, we won't receive FetchFailed. And it will make this test fail. + // Therefore, we should wait until all salves are up + sc.jobProgressListener.waitUntilExecutorsUp(2, 10000) + val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _) rdd.count() http://git-wip-us.apache.org/repos/asf/spark/blob/f2713478/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index c05e8bb..c054c71 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -17,11 +17,9 @@ package org.apache.spark.broadcast -import scala.concurrent.duration._ import scala.util.Random import org.scalatest.Assertions -import org.scalatest.concurrent.Eventually._ import org.apache.spark._ import org.apache.spark.io.SnappyCompressionCodec @@ -312,13 +310,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext { val _sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf) // Wait until all salves are up - eventually(timeout(10.seconds), interval(10.milliseconds)) { - _sc.jobProgressListener.synchronized { - val numBlockManagers = _sc.jobProgressListener.blockManagerIds.size - assert(numBlockManagers == numSlaves + 1, - s"Expect ${numSlaves + 1} block managers, but was ${numBlockManagers}") - } - } + _sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000) _sc } else { new SparkContext("local", "test", broadcastConf) http://git-wip-us.apache.org/repos/asf/spark/blob/f2713478/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala index 50273bc..d97fba0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala @@ -17,12 +17,12 @@ package org.apache.spark.scheduler -import org.apache.spark.scheduler.cluster.ExecutorInfo -import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite} +import scala.collection.mutable import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import scala.collection.mutable +import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite} +import org.apache.spark.scheduler.cluster.ExecutorInfo /** * Unit tests for SparkListener that require a local cluster. @@ -41,6 +41,10 @@ class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext val listener = new SaveExecutorInfo sc.addSparkListener(listener) + // This test will check if the number of executors received by "SparkListener" is same as the + // number of all executors, so we need to wait until all executors are up + sc.jobProgressListener.waitUntilExecutorsUp(2, 10000) + val rdd1 = sc.parallelize(1 to 100, 4) val rdd2 = rdd1.map(_.toString) rdd2.setName("Target RDD") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
