Repository: spark
Updated Branches:
  refs/heads/master 01277d4b2 -> cf1995a97


[SPARK-15783][CORE] Fix Flakiness in BlacklistIntegrationSuite

## What changes were proposed in this pull request?

Three changes here -- first two were causing failures w/ 
BlacklistIntegrationSuite

1. The testing framework didn't include the reviveOffers thread, so the test 
which involved delay scheduling might never submit offers late enough for the 
delay scheduling to kick in.  So added in the periodic revive offers, just like 
the real scheduler.

2. `assertEmptyDataStructures` would occasionally fail, because it appeared 
there was still an active job.  This is because in DAGScheduler, the jobWaiter 
is notified of the job completion before the data structures are cleaned up.  
Most of the time the test code that is waiting on the jobWaiter won't become 
active until after the data structures are cleared, but occasionally the race 
goes the other way, and the assertions fail.

3. `DAGSchedulerSuite` was not stopping all the inner parts it was setting up, 
so each test was leaking a number of threads.  So we stop those parts too.

4. Turns out that `assertMapOutputAvailable` is not terribly useful in this 
framework -- most of the places I was trying to use it suffer from some race.

5. When there is an exception in the backend, try to improve the error msg a 
little bit.  Before the exception was printed to the console, but the test 
would fail w/ a timeout, and the logs wouldn't show anything.

## How was this patch tested?

I ran all the tests in `BlacklistIntegrationSuite` 5k times and everything in 
`DAGSchedulerSuite` 1k times on my laptop.  Also I ran a full jenkins build 
with `BlacklistIntegrationSuite` 500 times and `DAGSchedulerSuite` 50 times, 
see https://github.com/apache/spark/pull/13548.  (I tried more times but 
jenkins timed out.)

To check for more leaked threads, I added some code to dump the list of all 
threads at the end of each test in DAGSchedulerSuite, which is how I discovered 
the mapOutputTracker and eventLoop were leaking threads.  (I removed that code 
from the final pr, just part of the testing.)

And I'll run Jenkins on this a couple of times to do one more check.

Author: Imran Rashid <[email protected]>

Closes #13565 from squito/blacklist_extra_tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf1995a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf1995a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf1995a9

Branch: refs/heads/master
Commit: cf1995a97645f0b44c997f4fdbba631fd6b91a16
Parents: 01277d4
Author: Imran Rashid <[email protected]>
Authored: Wed Jun 22 08:35:41 2016 -0500
Committer: Imran Rashid <[email protected]>
Committed: Wed Jun 22 08:35:41 2016 -0500

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |  4 +-
 .../scheduler/BlacklistIntegrationSuite.scala   | 16 ++---
 .../spark/scheduler/DAGSchedulerSuite.scala     |  9 ++-
 .../scheduler/SchedulerIntegrationSuite.scala   | 73 ++++++++++++++++----
 4 files changed, 76 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cf1995a9/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d4e0d6d..4eb7c81 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1465,8 +1465,10 @@ class DAGScheduler(
     }
 
     if (ableToCancelStages) {
-      job.listener.jobFailed(error)
+      // SPARK-15783 important to cleanup state first, just for tests where we 
have some asserts
+      // against the state.  Otherwise we have a *little* bit of flakiness in 
the tests.
       cleanupStateForJobAndIndependentStages(job)
+      job.listener.jobFailed(error)
       listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), 
JobFailed(error)))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cf1995a9/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index d8a4b19..8ba2697 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark._
 class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorMockBackend]{
 
   val badHost = "host-0"
+  val duration = Duration(10, SECONDS)
 
   /**
    * This backend just always fails if the task is executed on a bad host, but 
otherwise succeeds
@@ -41,12 +42,11 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
 
   // Test demonstrating the issue -- without a config change, the scheduler 
keeps scheduling
   // according to locality preferences, and so the job fails
-  ignore("If preferred node is bad, without blacklist job will fail") {
+  testScheduler("If preferred node is bad, without blacklist job will fail") {
     val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
     withBackend(badHostBackend _) {
       val jobFuture = submit(rdd, (0 until 10).toArray)
-      val duration = Duration(1, SECONDS)
-      Await.ready(jobFuture, duration)
+      awaitJobTermination(jobFuture, duration)
     }
     assertDataStructuresEmpty(noFailure = false)
   }
@@ -54,7 +54,7 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
   // even with the blacklist turned on, if maxTaskFailures is not more than 
the number
   // of executors on the bad node, then locality preferences will lead to us 
cycling through
   // the executors on the bad node, and still failing the job
-  ignoreScheduler(
+  testScheduler(
     "With blacklist on, job will still fail if there are too many bad 
executors on bad host",
     extraConfs = Seq(
       // just set this to something much longer than the test duration
@@ -64,15 +64,14 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
     val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
     withBackend(badHostBackend _) {
       val jobFuture = submit(rdd, (0 until 10).toArray)
-      val duration = Duration(3, SECONDS)
-      Await.ready(jobFuture, duration)
+      awaitJobTermination(jobFuture, duration)
     }
     assertDataStructuresEmpty(noFailure = false)
   }
 
   // Here we run with the blacklist on, and maxTaskFailures high enough that 
we'll eventually
   // schedule on a good node and succeed the job
-  ignoreScheduler(
+  testScheduler(
     "Bad node with multiple executors, job will still succeed with the right 
confs",
     extraConfs = Seq(
       // just set this to something much longer than the test duration
@@ -86,8 +85,7 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
     val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
     withBackend(badHostBackend _) {
       val jobFuture = submit(rdd, (0 until 10).toArray)
-      val duration = Duration(1, SECONDS)
-      Await.ready(jobFuture, duration)
+      awaitJobTermination(jobFuture, duration)
     }
     assert(results === (0 until 10).map { _ -> 42 }.toMap)
     assertDataStructuresEmpty(noFailure = true)

http://git-wip-us.apache.org/repos/asf/spark/blob/cf1995a9/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 63a4940..3382474 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -214,7 +214,11 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
     results.clear()
     securityMgr = new SecurityManager(conf)
     broadcastManager = new BroadcastManager(true, conf, securityMgr)
-    mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true)
+    mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, 
true) {
+      override def sendTracker(message: Any): Unit = {
+        // no-op, just so we can stop this to avoid leaking threads
+      }
+    }
     scheduler = new DAGScheduler(
       sc,
       taskScheduler,
@@ -228,6 +232,9 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
   override def afterEach(): Unit = {
     try {
       scheduler.stop()
+      dagEventProcessLoopTester.stop()
+      mapOutputTracker.stop()
+      broadcastManager.stop()
     } finally {
       super.afterEach()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/cf1995a9/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 12dfa56..634b94f 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -17,7 +17,8 @@
 package org.apache.spark.scheduler
 
 import java.util.Properties
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{TimeoutException, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.concurrent.{Await, Future}
@@ -32,7 +33,7 @@ import org.apache.spark._
 import org.apache.spark.TaskState._
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.util.{CallSite, Utils}
+import org.apache.spark.util.{CallSite, ThreadUtils, Utils}
 
 /**
  * Tests for the  entire scheduler code -- DAGScheduler, TaskSchedulerImpl, 
TaskSets,
@@ -55,6 +56,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: 
ClassTag] extends Spa
     }
     results.clear()
     failure = null
+    backendException.set(null)
     super.beforeEach()
   }
 
@@ -90,11 +92,6 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: 
ClassTag] extends Spa
     }
   }
 
-  // still a few races to work out in the blacklist tests, so ignore some tests
-  def ignoreScheduler(name: String, extraConfs: Seq[(String, 
String)])(testBody: => Unit): Unit = {
-    ignore(name)(testBody)
-  }
-
   /**
    * A map from partition -> results for all tasks of a job when you call this 
test framework's
    * [[submit]] method.  Two important considerations:
@@ -167,6 +164,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: 
ClassTag] extends Spa
       assert(failure != null)
     }
     assert(scheduler.activeJobs.isEmpty)
+    assert(backendException.get() == null)
   }
 
   /**
@@ -204,6 +202,8 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: 
ClassTag] extends Spa
     new MockRDD(sc, nParts, shuffleDeps)
   }
 
+  val backendException = new AtomicReference[Exception](null)
+
   /**
    * Helper which makes it a little easier to setup a test, which starts a 
mock backend in another
    * thread, responding to tasks with your custom function.  You also supply 
the "body" of your
@@ -218,7 +218,17 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: 
ClassTag] extends Spa
       override def run(): Unit = {
         while (backendContinue.get()) {
           if (backend.hasTasksWaitingToRun) {
-            backendFunc()
+            try {
+              backendFunc()
+            } catch {
+              case ex: Exception =>
+                // Try to do a little error handling around exceptions that 
might occur here --
+                // otherwise it can just look like a TimeoutException in the 
test itself.
+                logError("Exception in mock backend:", ex)
+                backendException.set(ex)
+                backendContinue.set(false)
+                throw ex
+            }
           } else {
             Thread.sleep(10)
           }
@@ -234,6 +244,25 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: 
ClassTag] extends Spa
     }
   }
 
+  /**
+   * Helper to do a little extra error checking while waiting for the job to 
terminate.  Primarily
+   * just does a little extra error handling if there is an exception from the 
backend.
+   */
+  def awaitJobTermination(jobFuture: Future[_], duration: Duration): Unit = {
+    try {
+      Await.ready(jobFuture, duration)
+    } catch {
+      case te: TimeoutException if backendException.get() != null =>
+        val msg = raw"""
+           | ----- Begin Backend Failure Msg -----
+           | ${Utils.exceptionString(backendException.get())}
+           | ----- End Backend Failure Msg ----
+        """.
+          stripMargin
+
+        fail(s"Future timed out after ${duration}, likely because of failure 
in backend: $msg")
+    }
+  }
 }
 
 /**
@@ -245,6 +274,17 @@ private[spark] abstract class MockBackend(
     conf: SparkConf,
     val taskScheduler: TaskSchedulerImpl) extends SchedulerBackend with 
Logging {
 
+  // Periodically revive offers to allow delay scheduling to work
+  private val reviveThread =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
+  private val reviveIntervalMs = 
conf.getTimeAsMs("spark.scheduler.revive.interval", "10ms")
+
+  reviveThread.scheduleAtFixedRate(new Runnable {
+    override def run(): Unit = Utils.tryLogNonFatalError {
+      reviveOffers()
+    }
+  }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
+
   /**
    * Test backends should call this to get a task that has been assigned to 
them by the scheduler.
    * Each task should be responded to with either [[taskSuccess]] or 
[[taskFailed]].
@@ -310,7 +350,9 @@ private[spark] abstract class MockBackend(
 
   override def start(): Unit = {}
 
-  override def stop(): Unit = {}
+  override def stop(): Unit = {
+    reviveThread.shutdown()
+  }
 
   val env = SparkEnv.get
 
@@ -334,8 +376,9 @@ private[spark] abstract class MockBackend(
   }
 
   /**
-   * This is called by the scheduler whenever it has tasks it would like to 
schedule.  It gets
-   * called in the scheduling thread, not the backend thread.
+   * This is called by the scheduler whenever it has tasks it would like to 
schedule, when a tasks
+   * completes (which will be in a result-getter thread), and by the 
reviveOffers thread for delay
+   * scheduling.
    */
   override def reviveOffers(): Unit = {
     val offers: Seq[WorkerOffer] = generateOffers()
@@ -484,7 +527,7 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
     withBackend(runBackend _) {
       val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
       val duration = Duration(1, SECONDS)
-      Await.ready(jobFuture, duration)
+      awaitJobTermination(jobFuture, duration)
     }
     assert(results === (0 until 10).map { _ -> 42 }.toMap)
     assertDataStructuresEmpty()
@@ -536,7 +579,7 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
     withBackend(runBackend _) {
       val jobFuture = submit(d, (0 until 30).toArray)
       val duration = Duration(1, SECONDS)
-      Await.ready(jobFuture, duration)
+      awaitJobTermination(jobFuture, duration)
     }
     assert(results === (0 until 30).map { idx => idx -> (4321 + idx) }.toMap)
     assertDataStructuresEmpty()
@@ -576,7 +619,7 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
     withBackend(runBackend _) {
       val jobFuture = submit(shuffledRdd, (0 until 10).toArray)
       val duration = Duration(1, SECONDS)
-      Await.ready(jobFuture, duration)
+      awaitJobTermination(jobFuture, duration)
     }
     assert(results === (0 until 10).map { idx => idx -> (42 + idx) }.toMap)
     assert(stageToAttempts === Map(0 -> Set(0, 1), 1 -> Set(0, 1)))
@@ -591,7 +634,7 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
     withBackend(runBackend _) {
       val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
       val duration = Duration(1, SECONDS)
-      Await.ready(jobFuture, duration)
+      awaitJobTermination(jobFuture, duration)
       failure.getMessage.contains("test task failure")
     }
     assertDataStructuresEmpty(noFailure = false)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to