Repository: spark Updated Branches: refs/heads/master dd9ca7b96 -> 5519760e0
[SPARK-2208] Fix for local metrics tests can fail on fast machines ## What changes were proposed in this pull request? A fix for local metrics tests that can fail on fast machines. This is probably what is suggested here #3380 by aarondav? ## How was this patch tested? CI Tests Cheers Author: Joan <j...@goyeau.com> Closes #11747 from joan38/SPARK-2208-Local-metrics-tests. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5519760e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5519760e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5519760e Branch: refs/heads/master Commit: 5519760e0fe7d52170b38a52ce3d670d158e2aba Parents: dd9ca7b Author: Joan <j...@goyeau.com> Authored: Thu Mar 24 09:47:44 2016 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Thu Mar 24 09:47:44 2016 +0000 ---------------------------------------------------------------------- .../spark/scheduler/SparkListenerSuite.scala | 40 ++++++++++++-------- 1 file changed, 25 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5519760e/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 58d217f..f1f9b69 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -26,6 +26,10 @@ import org.scalatest.Matchers import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} import org.apache.spark.executor.TaskMetrics +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.shuffle.IndexShuffleBlockResolver +import org.apache.spark.shuffle.sort.SortShuffleManager +import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.{ResetSystemProperties, RpcUtils} class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers @@ -215,28 +219,24 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("local metrics") { - sc = new SparkContext("local", "SparkListenerSuite") + val conf = new SparkConf() + .setMaster("local").setAppName("SparkListenerSuite") + .set("spark.shuffle.manager", classOf[SlowShuffleManager].getName) + sc = new SparkContext(conf) val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) sc.addSparkListener(new StatsReportListener) - // just to make sure some of the tasks take a noticeable amount of time - val w = { i: Int => - if (i == 0) { - Thread.sleep(100) - } - i - } val numSlices = 16 - val d = sc.parallelize(0 to 1e3.toInt, numSlices).map(w) + val d = sc.parallelize(0 to 1e3.toInt, numSlices) d.count() sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) listener.stageInfos.size should be (1) - val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1") - val d3 = d.map { i => w(i) -> (0 to (i % 5)) }.setName("shuffle input 2") + val d2 = d.map { i => i -> i * 2 }.setName("shuffle input 1") + val d3 = d.map { i => i -> (0 to (i % 5)) }.setName("shuffle input 2") val d4 = d2.cogroup(d3, numSlices).map { case (k, (v1, v2)) => - w(k) -> (v1.size, v2.size) + k -> (v1.size, v2.size) } d4.setName("A Cogroup") d4.collectAsMap() @@ -255,13 +255,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match taskInfoMetrics.map(_._2.executorDeserializeTime), stageInfo + " executorDeserializeTime") - /* Test is disabled (SEE SPARK-2208) if (stageInfo.rddInfos.exists(_.name == d4.name)) { checkNonZeroAvg( taskInfoMetrics.map(_._2.shuffleReadMetrics.get.fetchWaitTime), stageInfo + " fetchWaitTime") } - */ taskInfoMetrics.foreach { case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0L) @@ -337,7 +335,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match listener.wait(remainingWait) remainingWait = finishTime - System.currentTimeMillis } - assert(!listener.startedTasks.isEmpty) + assert(listener.startedTasks.nonEmpty) } f.cancel() @@ -476,3 +474,15 @@ private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListene var count = 0 override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1 } + +/** Slow ShuffleManager to simulate tasks that takes a noticeable amount of time */ +private class SlowShuffleManager(conf: SparkConf) extends SortShuffleManager(conf) { + + override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) { + + override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { + Thread.sleep(10) + super.getBlockData(blockId) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org