Updated Branches:
  refs/heads/branch-0.8 d0b9fce12 -> 9c9e71ea9

Fix race condition in JobLoggerSuite


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/295734f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/295734f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/295734f6

Branch: refs/heads/branch-0.8
Commit: 295734f6412be3d861fe0cc10b319063cc8909ab
Parents: 30bcd84
Author: Patrick Wendell <[email protected]>
Authored: Sat Dec 7 12:40:18 2013 -0800
Committer: Patrick Wendell <[email protected]>
Committed: Sat Dec 7 12:40:18 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/JobLoggerSuite.scala    | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/295734f6/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 9848818..22babe8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -31,6 +31,8 @@ import org.apache.spark.rdd.RDD
 
 
 class JobLoggerSuite extends FunSuite with LocalSparkContext with 
ShouldMatchers {
+  /** Length of time to wait while draining listener events. */
+  val WAIT_TIMEOUT_MILLIS = 10000
 
   test("inner method") {
     sc = new SparkContext("local", "joblogger")
@@ -91,6 +93,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext 
with ShouldMatchers
     sc.addSparkListener(joblogger)
     val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
     rdd.reduceByKey(_+_).collect()
+    assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
 
     val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER)
     
@@ -119,8 +122,9 @@ class JobLoggerSuite extends FunSuite with 
LocalSparkContext with ShouldMatchers
     }
     sc.addSparkListener(joblogger)
     val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
-    rdd.reduceByKey(_+_).collect()
-    
+    rdd.reduceByKey(_+_).collect() 
+    assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+
     joblogger.onJobStartCount should be (1)
     joblogger.onJobEndCount should be (1)
     joblogger.onTaskEndCount should be (8)

Reply via email to