Repository: spark
Updated Branches:
  refs/heads/master 1aa191e58 -> 282158914


[SPARK-16136][CORE] Fix flaky TaskManagerSuite

## What changes were proposed in this pull request?

TaskManagerSuite "Kill other task attempts when one attempt belonging to the 
same task succeeds" was flaky.  When checking whether a task is speculatable, 
at least one millisecond must pass since the task was submitted.  Use a manual 
clock to avoid the problem.

I noticed these tests were leaving lots of threads lying around as well (which 
prevented me from running the test repeatedly), so I fixed that too.

## How was this patch tested?

Ran the test 1k times on my laptop, passed every time (it failed about 20% of 
the time before this).

Author: Imran Rashid <[email protected]>

Closes #13848 from squito/fix_flaky_taskmanagersuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28215891
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28215891
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28215891

Branch: refs/heads/master
Commit: 282158914d89b35a3f85388cb20bd62215f4f589
Parents: 1aa191e
Author: Imran Rashid <[email protected]>
Authored: Mon Jun 27 16:28:59 2016 -0500
Committer: Imran Rashid <[email protected]>
Committed: Mon Jun 27 16:28:59 2016 -0500

----------------------------------------------------------------------
 .../spark/scheduler/TaskSetManagerSuite.scala   | 68 +++++++++++++-------
 1 file changed, 43 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/28215891/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 1d7c8f4..cfbabd8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -157,14 +157,27 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
   val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s")
   val MAX_TASK_FAILURES = 4
 
-  override def beforeEach() {
+  var sched: FakeTaskScheduler = null
+
+  override def beforeEach(): Unit = {
     super.beforeEach()
     FakeRackUtil.cleanUp()
+    sched = null
+  }
+
+  override def afterEach(): Unit = {
+    super.afterEach()
+    if (sched != null) {
+      sched.dagScheduler.stop()
+      sched.stop()
+      sched = null
+    }
   }
 
+
   test("TaskSet with no preferences") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(1)
     val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
@@ -183,7 +196,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("multiple offers with no preferences") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(3)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
     val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
@@ -217,7 +230,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("skip unsatisfiable locality levels") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", 
"host2"))
+    sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
     val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", 
"execB")))
     val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
@@ -233,7 +246,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("basic delay scheduling") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
     val taskSet = FakeTask.createTaskSet(4,
       Seq(TaskLocation("host1", "exec1")),
       Seq(TaskLocation("host2", "exec2")),
@@ -263,7 +276,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("we do not need to delay scheduling when we only have noPref tasks in 
the queue") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", 
"host2"))
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", "host2"))
     val taskSet = FakeTask.createTaskSet(3,
       Seq(TaskLocation("host1", "exec1")),
       Seq(TaskLocation("host2", "exec3")),
@@ -280,7 +293,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("delay scheduling with fallback") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc,
+    sched = new FakeTaskScheduler(sc,
       ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
     val taskSet = FakeTask.createTaskSet(5,
       Seq(TaskLocation("host1")),
@@ -320,7 +333,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("delay scheduling with failed hosts") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"),
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
       ("exec3", "host3"))
     val taskSet = FakeTask.createTaskSet(3,
       Seq(TaskLocation("host1")),
@@ -357,7 +370,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("task result lost") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(1)
     val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
@@ -374,7 +387,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("repeated failures lead to task set abortion") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(1)
     val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
@@ -404,7 +417,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
     sc = new SparkContext("local", "test", conf)
     // two executors on same host, one on different.
-    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
       ("exec1.1", "host1"), ("exec2", "host2"))
     // affinity to exec1 on host1 - which we will fail.
     val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", 
"exec1")))
@@ -486,7 +499,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     // Assign host2 to rack2
     FakeRackUtil.assignHostToRack("host2", "rack2")
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc)
+    sched = new FakeTaskScheduler(sc)
     val taskSet = FakeTask.createTaskSet(4,
       Seq(TaskLocation("host1", "execA")),
       Seq(TaskLocation("host1", "execB")),
@@ -518,7 +531,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("Executors exit for reason unrelated to currently running tasks") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc)
+    sched = new FakeTaskScheduler(sc)
     val taskSet = FakeTask.createTaskSet(4,
       Seq(TaskLocation("host1", "execA")),
       Seq(TaskLocation("host1", "execB")),
@@ -551,7 +564,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     // Assign host3 to rack2
     FakeRackUtil.assignHostToRack("host3", "rack2")
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc,
+    sched = new FakeTaskScheduler(sc,
       ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
     val taskSet = FakeTask.createTaskSet(2,
       Seq(TaskLocation("host1", "execA")),
@@ -574,7 +587,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("do not emit warning when serialized task is small") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(1)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
 
@@ -587,7 +600,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("emit warning when serialized task is large") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
 
     val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
@@ -601,7 +614,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("Not serializable exception thrown if the task cannot be serialized") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
 
     val taskSet = new TaskSet(
       Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 
1)), 0, 0, 0, null)
@@ -640,7 +653,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("speculative and noPref task should be scheduled after node-local") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(
+    sched = new FakeTaskScheduler(
       sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
     val taskSet = FakeTask.createTaskSet(4,
       Seq(TaskLocation("host1", "execA")),
@@ -668,7 +681,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
   test("node-local tasks should be scheduled right away " +
     "when there are only node-local and no-preference tasks") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(
+    sched = new FakeTaskScheduler(
       sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
     val taskSet = FakeTask.createTaskSet(4,
       Seq(TaskLocation("host1")),
@@ -691,7 +704,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
   test("SPARK-4939: node-local tasks should be scheduled right after 
process-local tasks finished")
   {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", 
"host2"))
+    sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
     val taskSet = FakeTask.createTaskSet(4,
       Seq(TaskLocation("host1")),
       Seq(TaskLocation("host2")),
@@ -712,7 +725,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("SPARK-4939: no-pref tasks should be scheduled after process-local 
tasks finished") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", 
"host2"))
+    sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
     val taskSet = FakeTask.createTaskSet(3,
       Seq(),
       Seq(ExecutorCacheTaskLocation("host1", "execA")),
@@ -733,7 +746,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
   test("Ensure TaskSetManager is usable after addition of levels") {
     // Regression test for SPARK-2931
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc)
+    sched = new FakeTaskScheduler(sc)
     val taskSet = FakeTask.createTaskSet(2,
       Seq(TaskLocation("host1", "execA")),
       Seq(TaskLocation("host2", "execB.1")))
@@ -765,7 +778,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
   test("Test that locations with HDFSCacheTaskLocation are treated as 
PROCESS_LOCAL.") {
     // Regression test for SPARK-2931
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc,
+    sched = new FakeTaskScheduler(sc,
       ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
     val taskSet = FakeTask.createTaskSet(3,
       Seq(TaskLocation("host1")),
@@ -793,11 +806,12 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   test("Kill other task attempts when one attempt belonging to the same task 
succeeds") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
     val taskSet = FakeTask.createTaskSet(4)
     // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
     sc.conf.set("spark.speculation.multiplier", "0.0")
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
     val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
       task.metrics.internalAccums
     }
@@ -819,6 +833,10 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
       assert(sched.endedTasks(id) === Success)
     }
 
+    // checkSpeculatableTasks checks that the task runtime is greater than the 
threshold for
+    // speculating. Since we use a threshold of 0 for speculation, tasks need 
to be running for
+    // > 0ms, so advance the clock by 1ms here.
+    clock.advance(1)
     assert(manager.checkSpeculatableTasks(0))
     // Offer resource to start the speculative attempt for the running task
     val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to