Fix race condition in SparkListenerSuite (fixes SPARK-908).

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a34a4e81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a34a4e81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a34a4e81

Branch: refs/heads/scala-2.10
Commit: a34a4e8174b5f285a327d7ff30ac9f3ff0db7689
Parents: 3218fa7
Author: Kay Ousterhout <kayousterh...@gmail.com>
Authored: Wed Oct 9 15:07:53 2013 -0700
Committer: Kay Ousterhout <kayousterh...@gmail.com>
Committed: Wed Oct 9 15:07:53 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala     |  2 +-
 .../org/apache/spark/scheduler/DAGScheduler.scala |  2 +-
 .../apache/spark/scheduler/SparkListenerBus.scala | 18 ++++++++++++++++++
 .../spark/scheduler/SparkListenerSuite.scala      | 15 +++++----------
 4 files changed, 25 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a34a4e81/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index febcf9c..ff45e76 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -231,7 +231,7 @@ class SparkContext(
   }
   taskScheduler.start()
 
-  @volatile private var dagScheduler = new DAGScheduler(taskScheduler)
+  @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
   dagScheduler.start()
 
   ui.start()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a34a4e81/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4053b91..5c40f50 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -114,7 +114,7 @@ class DAGScheduler(
 
   private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
 
-  private val listenerBus = new SparkListenerBus()
+  private[spark] val listenerBus = new SparkListenerBus()
 
   // Contains the locations that each RDD's partitions are cached on
   private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a34a4e81/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index a65e1ec..8283c4b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -70,5 +70,23 @@ private[spark] class SparkListenerBus() extends Logging {
       queueFullErrorMessageLogged = true
     }
   }
+
+  /** Waits until there are no more events in the queue, or until the 
specified time has elapsed.
+    * 
+    * Used for testing only. Returns true if the queue has emptied and false 
is the specified time
+    * elapsed before the queue emptied.
+    */
+  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+    val finishTime = System.currentTimeMillis + timeoutMillis
+    while (!eventQueue.isEmpty()) {
+      if (System.currentTimeMillis > finishTime) {
+        return false
+      }
+      /* Sleep rather than using wait/notify, because this is used only for 
testing and wait/notify
+       * add overhead in the general case. */
+      Thread.sleep(10)
+    }
+    return true
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a34a4e81/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 41a161e..6e80262 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -23,15 +23,9 @@ import scala.collection.mutable
 import org.scalatest.matchers.ShouldMatchers
 import org.apache.spark.SparkContext._
 
-/**
- *
- */
-
 class SparkListenerSuite extends FunSuite with LocalSparkContext with 
ShouldMatchers {
 
-  // TODO: This test has a race condition since the DAGScheduler now reports 
results
-  //       asynchronously. It needs to be updated for that patch.
-  ignore("local metrics") {
+  test("local metrics") {
     sc = new SparkContext("local[4]", "test")
     val listener = new SaveStageInfo
     sc.addSparkListener(listener)
@@ -45,7 +39,8 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with ShouldMatc
 
     val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)}
     d.count
-    Thread.sleep(1000)
+    val WAIT_TIMEOUT_MILLIS = 10000
+    assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     listener.stageInfos.size should be (1)
 
     val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1")
@@ -57,7 +52,7 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with ShouldMatc
 
     d4.collectAsMap
 
-    Thread.sleep(1000)
+    assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     listener.stageInfos.size should be (4)
     listener.stageInfos.foreach {stageInfo =>
       //small test, so some tasks might take less than 1 millisecond, but 
average should be greater than 1 ms
@@ -68,7 +63,7 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with ShouldMatc
         
checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime},
 stageInfo + " fetchWaitTime")
       }
 
-        stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) =>
+      stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) =>
         taskMetrics.resultSize should be > (0l)
         if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) {
           taskMetrics.shuffleWriteMetrics should be ('defined)

Reply via email to