Updated Branches:
  refs/heads/branch-0.8 f930dd4bf -> 0b6f047b5

Merge pull request #50 from kayousterhout/SPARK-908

Fix race condition in SparkListenerSuite (fixes SPARK-908).

(cherry picked from commit 215238cb399d46c83fafa64b3c98e0ebec21adb9)
Signed-off-by: Reynold Xin <r...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0b6f047b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0b6f047b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0b6f047b

Branch: refs/heads/branch-0.8
Commit: 0b6f047b5fcc20d9914e18eede329d507639ce0a
Parents: f930dd4
Author: Reynold Xin <r...@apache.org>
Authored: Wed Oct 9 16:49:44 2013 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Wed Oct 9 16:53:31 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  2 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  2 +-
 .../spark/scheduler/SparkListenerBus.scala      | 18 +++++++++++
 .../spark/scheduler/SparkListenerSuite.scala    | 32 +++++++++++---------
 4 files changed, 37 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0b6f047b/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 912ce75..6ad708f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -228,7 +228,7 @@ class SparkContext(
   }
   taskScheduler.start()
 
-  @volatile private var dagScheduler = new DAGScheduler(taskScheduler)
+  @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
   dagScheduler.start()
 
   ui.start()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0b6f047b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8a55df4..8a8b32c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -114,7 +114,7 @@ class DAGScheduler(
 
   private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
 
-  private val listenerBus = new SparkListenerBus()
+  private[spark] val listenerBus = new SparkListenerBus()
 
   // Contains the locations that each RDD's partitions are cached on
   private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0b6f047b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index a65e1ec..4d3e4a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -70,5 +70,23 @@ private[spark] class SparkListenerBus() extends Logging {
       queueFullErrorMessageLogged = true
     }
   }
+
+  /**
+   * Waits until there are no more events in the queue, or until the specified 
time has elapsed.
+   * Used for testing only. Returns true if the queue has emptied and false is 
the specified time
+   * elapsed before the queue emptied.
+   */
+  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+    val finishTime = System.currentTimeMillis + timeoutMillis
+    while (!eventQueue.isEmpty()) {
+      if (System.currentTimeMillis > finishTime) {
+        return false
+      }
+      /* Sleep rather than using wait/notify, because this is used only for 
testing and wait/notify
+       * add overhead in the general case. */
+      Thread.sleep(10)
+    }
+    return true
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0b6f047b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 41a161e..0d8742c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -23,15 +23,9 @@ import scala.collection.mutable
 import org.scalatest.matchers.ShouldMatchers
 import org.apache.spark.SparkContext._
 
-/**
- *
- */
-
 class SparkListenerSuite extends FunSuite with LocalSparkContext with 
ShouldMatchers {
 
-  // TODO: This test has a race condition since the DAGScheduler now reports 
results
-  //       asynchronously. It needs to be updated for that patch.
-  ignore("local metrics") {
+  test("local metrics") {
     sc = new SparkContext("local[4]", "test")
     val listener = new SaveStageInfo
     sc.addSparkListener(listener)
@@ -45,7 +39,8 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with ShouldMatc
 
     val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)}
     d.count
-    Thread.sleep(1000)
+    val WAIT_TIMEOUT_MILLIS = 10000
+    assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     listener.stageInfos.size should be (1)
 
     val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1")
@@ -57,18 +52,25 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with ShouldMatc
 
     d4.collectAsMap
 
-    Thread.sleep(1000)
+    assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     listener.stageInfos.size should be (4)
-    listener.stageInfos.foreach {stageInfo =>
-      //small test, so some tasks might take less than 1 millisecond, but 
average should be greater than 1 ms
+    listener.stageInfos.foreach { stageInfo =>
+      /* small test, so some tasks might take less than 1 millisecond, but 
average should be greater
+       * than 0 ms. */
       checkNonZeroAvg(stageInfo.taskInfos.map{_._1.duration}, stageInfo + " 
duration")
-      checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorRunTime.toLong}, 
stageInfo + " executorRunTime")
-      
checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong}, 
stageInfo + " executorDeserializeTime")
+      checkNonZeroAvg(
+        stageInfo.taskInfos.map{_._2.executorRunTime.toLong},
+        stageInfo + " executorRunTime")
+      checkNonZeroAvg(
+        stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong},
+        stageInfo + " executorDeserializeTime")
       if (stageInfo.stage.rdd.name == d4.name) {
-        
checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime},
 stageInfo + " fetchWaitTime")
+        checkNonZeroAvg(
+          stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime},
+          stageInfo + " fetchWaitTime")
       }
 
-        stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) =>
+      stageInfo.taskInfos.foreach { case (taskInfo, taskMetrics) =>
         taskMetrics.resultSize should be > (0l)
         if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) {
           taskMetrics.shuffleWriteMetrics should be ('defined)

Reply via email to