Updated Branches: refs/heads/branch-0.8 d0b9fce12 -> 9c9e71ea9
Fix race condition in JobLoggerSuite Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/295734f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/295734f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/295734f6 Branch: refs/heads/branch-0.8 Commit: 295734f6412be3d861fe0cc10b319063cc8909ab Parents: 30bcd84 Author: Patrick Wendell <[email protected]> Authored: Sat Dec 7 12:40:18 2013 -0800 Committer: Patrick Wendell <[email protected]> Committed: Sat Dec 7 12:40:18 2013 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/scheduler/JobLoggerSuite.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/295734f6/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala index 9848818..22babe8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala @@ -31,6 +31,8 @@ import org.apache.spark.rdd.RDD class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { + /** Length of time to wait while draining listener events. */ + val WAIT_TIMEOUT_MILLIS = 10000 test("inner method") { sc = new SparkContext("local", "joblogger") @@ -91,6 +93,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers sc.addSparkListener(joblogger) val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) } rdd.reduceByKey(_+_).collect() + assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER) @@ -119,8 +122,9 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers } sc.addSparkListener(joblogger) val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) } - rdd.reduceByKey(_+_).collect() - + rdd.reduceByKey(_+_).collect() + assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + joblogger.onJobStartCount should be (1) joblogger.onJobEndCount should be (1) joblogger.onTaskEndCount should be (8)
