Repository: spark
Updated Branches:
  refs/heads/master 07f46afc7 -> fdf9f94f8


[SPARK-15865][CORE] Blacklist should not result in job hanging with less than 4 
executors

## What changes were proposed in this pull request?

Before this change, when you turn on blacklisting with 
`spark.scheduler.executorTaskBlacklistTime`, but you have fewer than 
`spark.task.maxFailures` executors, you can end with a job "hung" after some 
task failures.

Whenever a taskset is unable to schedule anything on 
resourceOfferSingleTaskSet, we check whether the last pending task can be 
scheduled on *any* known executor.  If not, the taskset (and any corresponding 
jobs) are failed.
* Worst case, this is O(maxTaskFailures + numTasks).  But unless many executors 
are bad, this should be small
* This does not fail as fast as possible -- when a task becomes unschedulable, 
we keep scheduling other tasks.  This is to avoid an O(numPendingTasks * 
numExecutors) operation
* Also, it is conceivable this fails too quickly.  You may be 1 millisecond 
away from unblacklisting a place for a task to run, or acquiring a new executor.

## How was this patch tested?

Added unit test which failed before the change, ran new test 5k times manually, 
ran all scheduler tests manually, and the full suite via jenkins.

Author: Imran Rashid <[email protected]>

Closes #13603 from squito/progress_w_few_execs_and_blacklist.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdf9f94f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdf9f94f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdf9f94f

Branch: refs/heads/master
Commit: fdf9f94f8c8861a00cd8415073f842b857c397f7
Parents: 07f46af
Author: Imran Rashid <[email protected]>
Authored: Thu Jun 30 13:36:06 2016 -0500
Committer: Imran Rashid <[email protected]>
Committed: Thu Jun 30 13:36:06 2016 -0500

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/TaskInfo.scala   |  4 +
 .../spark/scheduler/TaskSchedulerImpl.scala     |  3 +
 .../apache/spark/scheduler/TaskSetManager.scala | 54 +++++++++++-
 .../apache/spark/executor/ExecutorSuite.scala   |  2 +-
 .../scheduler/BlacklistIntegrationSuite.scala   | 33 ++++++-
 .../org/apache/spark/scheduler/FakeTask.scala   |  5 +-
 .../org/apache/spark/scheduler/PoolSuite.scala  |  2 +-
 .../scheduler/SchedulerIntegrationSuite.scala   | 14 +--
 .../scheduler/TaskSchedulerImplSuite.scala      | 92 ++++++++++++++++++++
 9 files changed, 194 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 2d89232..eeb7963 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -30,6 +30,10 @@ import org.apache.spark.annotation.DeveloperApi
 @DeveloperApi
 class TaskInfo(
     val taskId: Long,
+    /**
+     * The index of this task within its task set. Not necessarily the same as 
the ID of the RDD
+     * partition that the task is computing.
+     */
     val index: Int,
     val attemptNumber: Int,
     val launchTime: Long,

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 821e3ee..2ce49ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -279,6 +279,9 @@ private[spark] class TaskSchedulerImpl(
         }
       }
     }
+    if (!launchedTask) {
+      taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
+    }
     return launchedTask
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 2eedd20..2fef447 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -83,7 +83,7 @@ private[spark] class TaskSetManager(
   val copiesRunning = new Array[Int](numTasks)
   val successful = new Array[Boolean](numTasks)
   private val numFailures = new Array[Int](numTasks)
-  // key is taskId, value is a Map of executor id to when it failed
+  // key is taskId (aka TaskInfo.index), value is a Map of executor id to when 
it failed
   private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
 
   val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
@@ -270,7 +270,7 @@ private[spark] class TaskSetManager(
    * Is this re-execution of a failed task on an executor it already failed in 
before
    * EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ?
    */
-  private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
+  private[scheduler] def executorIsBlacklisted(execId: String, taskId: Int): 
Boolean = {
     if (failedExecutors.contains(taskId)) {
       val failed = failedExecutors.get(taskId).get
 
@@ -576,6 +576,56 @@ private[spark] class TaskSetManager(
   }
 
   /**
+   * Check whether the given task set has been blacklisted to the point that 
it can't run anywhere.
+   *
+   * It is possible that this taskset has become impossible to schedule 
*anywhere* due to the
+   * blacklist.  The most common scenario would be if there are fewer 
executors than
+   * spark.task.maxFailures. We need to detect this so we can fail the task 
set, otherwise the job
+   * will hang.
+   *
+   * There's a tradeoff here: we could make sure all tasks in the task set are 
schedulable, but that
+   * would add extra time to each iteration of the scheduling loop. Here, we 
take the approach of
+   * making sure at least one of the unscheduled tasks is schedulable. This 
means we may not detect
+   * the hang as quickly as we could have, but we'll always detect the hang 
eventually, and the
+   * method is faster in the typical case. In the worst case, this method can 
take
+   * O(maxTaskFailures + numTasks) time, but it will be faster when there 
haven't been any task
+   * failures (this is because the method picks on unscheduled task, and then 
iterates through each
+   * executor until it finds one that the task hasn't failed on already).
+   */
+  private[scheduler] def abortIfCompletelyBlacklisted(executors: 
Iterable[String]): Unit = {
+
+    val pendingTask: Option[Int] = {
+      // usually this will just take the last pending task, but because of the 
lazy removal
+      // from each list, we may need to go deeper in the list.  We poll from 
the end because
+      // failed tasks are put back at the end of allPendingTasks, so we're 
more likely to find
+      // an unschedulable task this way.
+      val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet =>
+        copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet)
+      }
+      if (indexOffset == -1) {
+        None
+      } else {
+        Some(allPendingTasks(indexOffset))
+      }
+    }
+
+    // If no executors have registered yet, don't abort the stage, just wait.  
We probably
+    // got here because a task set was added before the executors registered.
+    if (executors.nonEmpty) {
+      // take any task that needs to be scheduled, and see if we can find some 
executor it *could*
+      // run on
+      pendingTask.foreach { taskId =>
+        if (executors.forall(executorIsBlacklisted(_, taskId))) {
+          val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")")
+          val partition = tasks(taskId).partitionId
+          abort(s"Aborting ${taskSet} because task $taskId (partition 
$partition)" +
+            s" has already failed on executors $execs, and no other executors 
are available.")
+        }
+      }
+    }
+  }
+
+  /**
    * Marks the task as getting result and notifies the DAG Scheduler
    */
   def handleTaskGettingResult(tid: Long): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 3e69894..683eeee 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -53,7 +53,7 @@ class ExecutorSuite extends SparkFunSuite {
     when(mockEnv.closureSerializer).thenReturn(serializer)
     val serializedTask =
       Task.serializeWithDependencies(
-        new FakeTask(0),
+        new FakeTask(0, 0),
         HashMap[String, Long](),
         HashMap[String, Long](),
         serializer.newInstance())

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index 8ba2697..14c8b66 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -57,7 +57,8 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
   testScheduler(
     "With blacklist on, job will still fail if there are too many bad 
executors on bad host",
     extraConfs = Seq(
-      // just set this to something much longer than the test duration
+      // set this to something much longer than the test duration so that 
executors don't get
+      // removed from the blacklist during the test
       ("spark.scheduler.executorTaskBlacklistTime", "10000000")
     )
   ) {
@@ -74,7 +75,8 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
   testScheduler(
     "Bad node with multiple executors, job will still succeed with the right 
confs",
     extraConfs = Seq(
-      // just set this to something much longer than the test duration
+      // set this to something much longer than the test duration so that 
executors don't get
+      // removed from the blacklist during the test
       ("spark.scheduler.executorTaskBlacklistTime", "10000000"),
       // this has to be higher than the number of executors on the bad host
       ("spark.task.maxFailures", "5"),
@@ -91,6 +93,33 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
     assertDataStructuresEmpty(noFailure = true)
   }
 
+  // Make sure that if we've failed on all executors, but haven't hit 
task.maxFailures yet, the job
+  // doesn't hang
+  testScheduler(
+    "SPARK-15865 Progress with fewer executors than maxTaskFailures",
+    extraConfs = Seq(
+      // set this to something much longer than the test duration so that 
executors don't get
+      // removed from the blacklist during the test
+      "spark.scheduler.executorTaskBlacklistTime" -> "10000000",
+      "spark.testing.nHosts" -> "2",
+      "spark.testing.nExecutorsPerHost" -> "1",
+      "spark.testing.nCoresPerExecutor" -> "1"
+    )
+  ) {
+    def runBackend(): Unit = {
+      val (taskDescription, _) = backend.beginTask()
+      backend.taskFailed(taskDescription, new RuntimeException("test task 
failure"))
+    }
+    withBackend(runBackend _) {
+      val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
+      Await.ready(jobFuture, duration)
+      val pattern = ("Aborting TaskSet 0.0 because task .* " +
+        "already failed on executors \\(.*\\), and no other executors are 
available").r
+      assert(pattern.findFirstIn(failure.getMessage).isDefined,
+        s"Couldn't find $pattern in ${failure.getMessage()}")
+    }
+    assertDataStructuresEmpty(noFailure = false)
+  }
 }
 
 class MultiExecutorMockBackend(

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala 
b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index 4fe705b..87600fe 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -21,7 +21,8 @@ import org.apache.spark.TaskContext
 
 class FakeTask(
     stageId: Int,
-    prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, 0) {
+    partitionId: Int,
+    prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, 
partitionId) {
   override def runTask(context: TaskContext): Int = 0
   override def preferredLocations: Seq[TaskLocation] = prefLocs
 }
@@ -40,7 +41,7 @@ object FakeTask {
       throw new IllegalArgumentException("Wrong number of task locations")
     }
     val tasks = Array.tabulate[Task[_]](numTasks) { i =>
-      new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil)
+      new FakeTask(0, i, if (prefLocs.size != 0) prefLocs(i) else Nil)
     }
     new TaskSet(tasks, 0, stageAttemptId, 0, null)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index 467796d..00e1c44 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -30,7 +30,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
   def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: 
TaskSchedulerImpl)
     : TaskSetManager = {
     val tasks = Array.tabulate[Task[_]](numTasks) { i =>
-      new FakeTask(i, Nil)
+      new FakeTask(stageId, i, Nil)
     }
     new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 
0)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 634b94f..14f52a6 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -279,12 +279,6 @@ private[spark] abstract class MockBackend(
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
   private val reviveIntervalMs = 
conf.getTimeAsMs("spark.scheduler.revive.interval", "10ms")
 
-  reviveThread.scheduleAtFixedRate(new Runnable {
-    override def run(): Unit = Utils.tryLogNonFatalError {
-      reviveOffers()
-    }
-  }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
-
   /**
    * Test backends should call this to get a task that has been assigned to 
them by the scheduler.
    * Each task should be responded to with either [[taskSuccess]] or 
[[taskFailed]].
@@ -348,7 +342,13 @@ private[spark] abstract class MockBackend(
     assignedTasksWaitingToRun.nonEmpty
   }
 
-  override def start(): Unit = {}
+  override def start(): Unit = {
+    reviveThread.scheduleAtFixedRate(new Runnable {
+      override def run(): Unit = Utils.tryLogNonFatalError {
+        reviveOffers()
+      }
+    }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
+  }
 
   override def stop(): Unit = {
     reviveThread.shutdown()

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 34b8d55..100b157 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -281,6 +281,98 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(!failedTaskSet)
   }
 
+  test("abort stage if executor loss results in unschedulability from 
previously failed tasks") {
+    // Make sure we can detect when a taskset becomes unschedulable from a 
blacklisting.  This
+    // test explores a particular corner case -- you may have one task fail, 
but still be
+    // schedulable on another executor.  However, that executor may fail later 
on, leaving the
+    // first task with no place to run.
+    val taskScheduler = setupScheduler(
+      // set this to something much longer than the test duration so that 
executors don't get
+      // removed from the blacklist during the test
+      "spark.scheduler.executorTaskBlacklistTime" -> "10000000"
+    )
+
+    val taskSet = FakeTask.createTaskSet(2)
+    taskScheduler.submitTasks(taskSet)
+    val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, 
taskSet.stageAttemptId).get
+
+    val firstTaskAttempts = taskScheduler.resourceOffers(Seq(
+      new WorkerOffer("executor0", "host0", 1),
+      new WorkerOffer("executor1", "host1", 1)
+    )).flatten
+    assert(Set("executor0", "executor1") === 
firstTaskAttempts.map(_.executorId).toSet)
+
+    // fail one of the tasks, but leave the other running
+    val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get
+    taskScheduler.handleFailedTask(tsm, failedTask.taskId, TaskState.FAILED, 
TaskResultLost)
+    // at this point, our failed task could run on the other executor, so 
don't give up the task
+    // set yet.
+    assert(!failedTaskSet)
+
+    // Now we fail our second executor.  The other task can still run on 
executor1, so make an offer
+    // on that executor, and make sure that the other task (not the failed 
one) is assigned there
+    taskScheduler.executorLost("executor1", SlaveLost("oops"))
+    val nextTaskAttempts =
+      taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 
1))).flatten
+    // Note: Its OK if some future change makes this already realize the 
taskset has become
+    // unschedulable at this point (though in the current implementation, 
we're sure it will not)
+    assert(nextTaskAttempts.size === 1)
+    assert(nextTaskAttempts.head.executorId === "executor0")
+    assert(nextTaskAttempts.head.attemptNumber === 1)
+    assert(nextTaskAttempts.head.index != failedTask.index)
+
+    // now we should definitely realize that our task set is unschedulable, 
because the only
+    // task left can't be scheduled on any executors due to the blacklist
+    taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1)))
+    sc.listenerBus.waitUntilEmpty(100000)
+    assert(tsm.isZombie)
+    assert(failedTaskSet)
+    val idx = failedTask.index
+    assert(failedTaskSetReason == s"Aborting TaskSet 0.0 because task $idx 
(partition $idx) has " +
+      s"already failed on executors (executor0), and no other executors are 
available.")
+  }
+
+  test("don't abort if there is an executor available, though it hasn't had 
scheduled tasks yet") {
+    // interaction of SPARK-15865 & SPARK-16106
+    // if we have a small number of tasks, we might be able to schedule them 
all on the first
+    // executor.  But if those tasks fail, we should still realize there is 
another executor
+    // available and not bail on the job
+
+    val taskScheduler = setupScheduler(
+      // set this to something much longer than the test duration so that 
executors don't get
+      // removed from the blacklist during the test
+      "spark.scheduler.executorTaskBlacklistTime" -> "10000000"
+    )
+
+    val taskSet = FakeTask.createTaskSet(2, (0 until 2).map { _ => 
Seq(TaskLocation("host0")) }: _*)
+    taskScheduler.submitTasks(taskSet)
+    val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, 
taskSet.stageAttemptId).get
+
+    val offers = Seq(
+      // each offer has more than enough free cores for the entire task set, 
so when combined
+      // with the locality preferences, we schedule all tasks on one executor
+      new WorkerOffer("executor0", "host0", 4),
+      new WorkerOffer("executor1", "host1", 4)
+    )
+    val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+    assert(firstTaskAttempts.size == 2)
+    firstTaskAttempts.foreach { taskAttempt => assert("executor0" === 
taskAttempt.executorId) }
+
+    // fail all the tasks on the bad executor
+    firstTaskAttempts.foreach { taskAttempt =>
+      taskScheduler.handleFailedTask(tsm, taskAttempt.taskId, 
TaskState.FAILED, TaskResultLost)
+    }
+
+    // Here is the main check of this test -- we have the same offers again, 
and we schedule it
+    // successfully.  Because the scheduler first tries to schedule with 
locality in mind, at first
+    // it won't schedule anything on executor1.  But despite that, we don't 
abort the job.  Then the
+    // scheduler tries for ANY locality, and successfully schedules tasks on 
executor1.
+    val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+    assert(secondTaskAttempts.size == 2)
+    secondTaskAttempts.foreach { taskAttempt => assert("executor1" === 
taskAttempt.executorId) }
+    assert(!failedTaskSet)
+  }
+
   test("SPARK-16106 locality levels updated if executor added to existing 
host") {
     val taskScheduler = setupScheduler()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to