Updated Branches: refs/heads/branch-0.8 f930dd4bf -> 0b6f047b5
Merge pull request #50 from kayousterhout/SPARK-908 Fix race condition in SparkListenerSuite (fixes SPARK-908). (cherry picked from commit 215238cb399d46c83fafa64b3c98e0ebec21adb9) Signed-off-by: Reynold Xin <r...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0b6f047b Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0b6f047b Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0b6f047b Branch: refs/heads/branch-0.8 Commit: 0b6f047b5fcc20d9914e18eede329d507639ce0a Parents: f930dd4 Author: Reynold Xin <r...@apache.org> Authored: Wed Oct 9 16:49:44 2013 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Wed Oct 9 16:53:31 2013 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../spark/scheduler/SparkListenerBus.scala | 18 +++++++++++ .../spark/scheduler/SparkListenerSuite.scala | 32 +++++++++++--------- 4 files changed, 37 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0b6f047b/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 912ce75..6ad708f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -228,7 +228,7 @@ class SparkContext( } taskScheduler.start() - @volatile private var dagScheduler = new DAGScheduler(taskScheduler) + @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler) dagScheduler.start() ui.start() http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0b6f047b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8a55df4..8a8b32c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -114,7 +114,7 @@ class DAGScheduler( private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo] - private val listenerBus = new SparkListenerBus() + private[spark] val listenerBus = new SparkListenerBus() // Contains the locations that each RDD's partitions are cached on private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0b6f047b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index a65e1ec..4d3e4a1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -70,5 +70,23 @@ private[spark] class SparkListenerBus() extends Logging { queueFullErrorMessageLogged = true } } + + /** + * Waits until there are no more events in the queue, or until the specified time has elapsed. + * Used for testing only. Returns true if the queue has emptied and false is the specified time + * elapsed before the queue emptied. + */ + def waitUntilEmpty(timeoutMillis: Int): Boolean = { + val finishTime = System.currentTimeMillis + timeoutMillis + while (!eventQueue.isEmpty()) { + if (System.currentTimeMillis > finishTime) { + return false + } + /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify + * add overhead in the general case. */ + Thread.sleep(10) + } + return true + } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0b6f047b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 41a161e..0d8742c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -23,15 +23,9 @@ import scala.collection.mutable import org.scalatest.matchers.ShouldMatchers import org.apache.spark.SparkContext._ -/** - * - */ - class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { - // TODO: This test has a race condition since the DAGScheduler now reports results - // asynchronously. It needs to be updated for that patch. - ignore("local metrics") { + test("local metrics") { sc = new SparkContext("local[4]", "test") val listener = new SaveStageInfo sc.addSparkListener(listener) @@ -45,7 +39,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)} d.count - Thread.sleep(1000) + val WAIT_TIMEOUT_MILLIS = 10000 + assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.stageInfos.size should be (1) val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1") @@ -57,18 +52,25 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc d4.collectAsMap - Thread.sleep(1000) + assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.stageInfos.size should be (4) - listener.stageInfos.foreach {stageInfo => - //small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms + listener.stageInfos.foreach { stageInfo => + /* small test, so some tasks might take less than 1 millisecond, but average should be greater + * than 0 ms. */ checkNonZeroAvg(stageInfo.taskInfos.map{_._1.duration}, stageInfo + " duration") - checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorRunTime.toLong}, stageInfo + " executorRunTime") - checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong}, stageInfo + " executorDeserializeTime") + checkNonZeroAvg( + stageInfo.taskInfos.map{_._2.executorRunTime.toLong}, + stageInfo + " executorRunTime") + checkNonZeroAvg( + stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong}, + stageInfo + " executorDeserializeTime") if (stageInfo.stage.rdd.name == d4.name) { - checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime") + checkNonZeroAvg( + stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, + stageInfo + " fetchWaitTime") } - stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) => + stageInfo.taskInfos.foreach { case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0l) if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) { taskMetrics.shuffleWriteMetrics should be ('defined)