Repository: spark Updated Branches: refs/heads/master 2cbc41282 -> e6ceac49a
[SPARK-13096][TEST] Fix flaky verifyPeakExecutionMemorySet Previously we would assert things before all events are guaranteed to have been processed. To fix this, just block until all events are actually processed, i.e. until the listener queue is empty. https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/79/testReport/junit/org.apache.spark.util.collection/ExternalAppendOnlyMapSuite/spilling/ Author: Andrew Or <and...@databricks.com> Closes #10990 from andrewor14/accum-suite-less-flaky. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e6ceac49 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e6ceac49 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e6ceac49 Branch: refs/heads/master Commit: e6ceac49a311faf3413acda57a6612fe806adf90 Parents: 2cbc412 Author: Andrew Or <and...@databricks.com> Authored: Fri Jan 29 17:59:41 2016 -0800 Committer: Andrew Or <and...@databricks.com> Committed: Fri Jan 29 17:59:41 2016 -0800 ---------------------------------------------------------------------- core/src/test/scala/org/apache/spark/AccumulatorSuite.scala | 2 ++ 1 file changed, 2 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e6ceac49/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 11c97d7..b8f2b96 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -307,6 +307,8 @@ private[spark] object AccumulatorSuite { val listener = new SaveInfoListener sc.addSparkListener(listener) testBody + // wait until all events have been processed before proceeding to assert things + sc.listenerBus.waitUntilEmpty(10 * 1000) val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values) val isSet = accums.exists { a => a.name == Some(PEAK_EXECUTION_MEMORY) && a.value.exists(_.asInstanceOf[Long] > 0L) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org