Fix race condition in SparkListenerSuite (fixes SPARK-908).
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a34a4e81 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a34a4e81 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a34a4e81 Branch: refs/heads/scala-2.10 Commit: a34a4e8174b5f285a327d7ff30ac9f3ff0db7689 Parents: 3218fa7 Author: Kay Ousterhout <kayousterh...@gmail.com> Authored: Wed Oct 9 15:07:53 2013 -0700 Committer: Kay Ousterhout <kayousterh...@gmail.com> Committed: Wed Oct 9 15:07:53 2013 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../apache/spark/scheduler/SparkListenerBus.scala | 18 ++++++++++++++++++ .../spark/scheduler/SparkListenerSuite.scala | 15 +++++---------- 4 files changed, 25 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a34a4e81/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index febcf9c..ff45e76 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -231,7 +231,7 @@ class SparkContext( } taskScheduler.start() - @volatile private var dagScheduler = new DAGScheduler(taskScheduler) + @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler) dagScheduler.start() ui.start() http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a34a4e81/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4053b91..5c40f50 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -114,7 +114,7 @@ class DAGScheduler( private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo] - private val listenerBus = new SparkListenerBus() + private[spark] val listenerBus = new SparkListenerBus() // Contains the locations that each RDD's partitions are cached on private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a34a4e81/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index a65e1ec..8283c4b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -70,5 +70,23 @@ private[spark] class SparkListenerBus() extends Logging { queueFullErrorMessageLogged = true } } + + /** Waits until there are no more events in the queue, or until the specified time has elapsed. + * + * Used for testing only. Returns true if the queue has emptied and false is the specified time + * elapsed before the queue emptied. + */ + def waitUntilEmpty(timeoutMillis: Int): Boolean = { + val finishTime = System.currentTimeMillis + timeoutMillis + while (!eventQueue.isEmpty()) { + if (System.currentTimeMillis > finishTime) { + return false + } + /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify + * add overhead in the general case. */ + Thread.sleep(10) + } + return true + } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a34a4e81/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 41a161e..6e80262 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -23,15 +23,9 @@ import scala.collection.mutable import org.scalatest.matchers.ShouldMatchers import org.apache.spark.SparkContext._ -/** - * - */ - class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { - // TODO: This test has a race condition since the DAGScheduler now reports results - // asynchronously. It needs to be updated for that patch. - ignore("local metrics") { + test("local metrics") { sc = new SparkContext("local[4]", "test") val listener = new SaveStageInfo sc.addSparkListener(listener) @@ -45,7 +39,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)} d.count - Thread.sleep(1000) + val WAIT_TIMEOUT_MILLIS = 10000 + assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.stageInfos.size should be (1) val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1") @@ -57,7 +52,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc d4.collectAsMap - Thread.sleep(1000) + assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.stageInfos.size should be (4) listener.stageInfos.foreach {stageInfo => //small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms @@ -68,7 +63,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime") } - stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) => + stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0l) if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) { taskMetrics.shuffleWriteMetrics should be ('defined)