Repository: spark
Updated Branches:
  refs/heads/master 5519760e0 -> 342079dc4


Revert "[SPARK-2208] Fix for local metrics tests can fail on fast machines". 
The test appears to still be flaky after this change, or more flaky.

This reverts commit 5519760e0fe7d52170b38a52ce3d670d158e2aba.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/342079dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/342079dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/342079dc

Branch: refs/heads/master
Commit: 342079dc45425309798d6082cccef86858f08a77
Parents: 5519760
Author: Sean Owen <so...@cloudera.com>
Authored: Thu Mar 24 17:27:20 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Mar 24 17:27:20 2016 +0000

----------------------------------------------------------------------
 .../spark/scheduler/SparkListenerSuite.scala    | 40 ++++++++------------
 1 file changed, 15 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/342079dc/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index f1f9b69..58d217f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -26,10 +26,6 @@ import org.scalatest.Matchers
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkException, SparkFunSuite}
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.network.buffer.ManagedBuffer
-import org.apache.spark.shuffle.IndexShuffleBlockResolver
-import org.apache.spark.shuffle.sort.SortShuffleManager
-import org.apache.spark.storage.ShuffleBlockId
 import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
 
 class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with 
Matchers
@@ -219,24 +215,28 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
   }
 
   test("local metrics") {
-    val conf = new SparkConf()
-      .setMaster("local").setAppName("SparkListenerSuite")
-      .set("spark.shuffle.manager", classOf[SlowShuffleManager].getName)
-    sc = new SparkContext(conf)
+    sc = new SparkContext("local", "SparkListenerSuite")
     val listener = new SaveStageAndTaskInfo
     sc.addSparkListener(listener)
     sc.addSparkListener(new StatsReportListener)
+    // just to make sure some of the tasks take a noticeable amount of time
+    val w = { i: Int =>
+      if (i == 0) {
+        Thread.sleep(100)
+      }
+      i
+    }
 
     val numSlices = 16
-    val d = sc.parallelize(0 to 1e3.toInt, numSlices)
+    val d = sc.parallelize(0 to 1e3.toInt, numSlices).map(w)
     d.count()
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     listener.stageInfos.size should be (1)
 
-    val d2 = d.map { i => i -> i * 2 }.setName("shuffle input 1")
-    val d3 = d.map { i => i -> (0 to (i % 5)) }.setName("shuffle input 2")
+    val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
+    val d3 = d.map { i => w(i) -> (0 to (i % 5)) }.setName("shuffle input 2")
     val d4 = d2.cogroup(d3, numSlices).map { case (k, (v1, v2)) =>
-      k -> (v1.size, v2.size)
+      w(k) -> (v1.size, v2.size)
     }
     d4.setName("A Cogroup")
     d4.collectAsMap()
@@ -255,11 +255,13 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
         taskInfoMetrics.map(_._2.executorDeserializeTime),
         stageInfo + " executorDeserializeTime")
 
+      /* Test is disabled (SEE SPARK-2208)
       if (stageInfo.rddInfos.exists(_.name == d4.name)) {
         checkNonZeroAvg(
           taskInfoMetrics.map(_._2.shuffleReadMetrics.get.fetchWaitTime),
           stageInfo + " fetchWaitTime")
       }
+      */
 
       taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
         taskMetrics.resultSize should be > (0L)
@@ -335,7 +337,7 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
         listener.wait(remainingWait)
         remainingWait = finishTime - System.currentTimeMillis
       }
-      assert(listener.startedTasks.nonEmpty)
+      assert(!listener.startedTasks.isEmpty)
     }
 
     f.cancel()
@@ -474,15 +476,3 @@ private class ListenerThatAcceptsSparkConf(conf: 
SparkConf) extends SparkListene
   var count = 0
   override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
 }
-
-/** Slow ShuffleManager to simulate tasks that takes a noticeable amount of 
time */
-private class SlowShuffleManager(conf: SparkConf) extends 
SortShuffleManager(conf) {
-
-  override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) {
-
-    override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
-      Thread.sleep(10)
-      super.getBlockData(blockId)
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to