Repository: spark
Updated Branches:
refs/heads/master 3ed91c9b8 -> fdd3bace1
[SPARK-22148][SPARK-15815][SCHEDULER] Acquire new executors to avoid hang
because of blacklisting
## What changes were proposed in this pull request?
Every time a task is unschedulable because of the condition where no. of task
failures < no. of executors available, we currently abort the taskSet - failing
the job. This change tries to acquire new executors so that we can complete the
job successfully. We try to acquire a new executor only when we can kill an
existing idle executor. We fallback to the older implementation where we abort
the job if we cannot find an idle executor.
## How was this patch tested?
I performed some manual tests to check and validate the behavior.
```scala
val rdd = sc.parallelize(Seq(1 to 10), 3)
import org.apache.spark.TaskContext
val mapped = rdd.mapPartitionsWithIndex ( (index, iterator) => { if (index ==
2) { Thread.sleep(30 * 1000); val attemptNum = TaskContext.get.attemptNumber;
if (attemptNum < 3) throw new Exception("Fail for blacklisting")};
iterator.toList.map (x => x + " -> " + index).iterator } )
mapped.collect
```
Closes #22288 from dhruve/bug/SPARK-22148.
Lead-authored-by: Dhruve Ashar <[email protected]>
Co-authored-by: Dhruve Ashar <[email protected]>
Co-authored-by: Tom Graves <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdd3bace
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdd3bace
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdd3bace
Branch: refs/heads/master
Commit: fdd3bace1da01e5958fe0345c38e889e740ce25e
Parents: 3ed91c9
Author: Dhruve Ashar <[email protected]>
Authored: Tue Nov 6 08:25:32 2018 -0600
Committer: Thomas Graves <[email protected]>
Committed: Tue Nov 6 08:25:32 2018 -0600
----------------------------------------------------------------------
.../apache/spark/internal/config/package.scala | 8 +
.../spark/scheduler/BlacklistTracker.scala | 30 ++-
.../spark/scheduler/TaskSchedulerImpl.scala | 71 ++++++-
.../apache/spark/scheduler/TaskSetManager.scala | 41 ++--
.../scheduler/BlacklistIntegrationSuite.scala | 7 +-
.../scheduler/TaskSchedulerImplSuite.scala | 189 ++++++++++++++++++-
docs/configuration.md | 8 +
7 files changed, 318 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/fdd3bace/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index c8993e1..2b3ba3c 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -622,6 +622,14 @@ package object config {
.checkValue(v => v > 0, "The value should be a positive time value.")
.createWithDefaultString("365d")
+ private[spark] val UNSCHEDULABLE_TASKSET_TIMEOUT =
+ ConfigBuilder("spark.scheduler.blacklist.unschedulableTaskSetTimeout")
+ .doc("The timeout in seconds to wait to acquire a new executor and
schedule a task " +
+ "before aborting a TaskSet which is unschedulable because of being
completely blacklisted.")
+ .timeConf(TimeUnit.SECONDS)
+ .checkValue(v => v >= 0, "The value should be a non negative time
value.")
+ .createWithDefault(120)
+
private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL =
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval")
.doc("Time in seconds to wait between a max concurrent tasks check
failure and the next " +
http://git-wip-us.apache.org/repos/asf/spark/blob/fdd3bace/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index 980fbbe..ef6d02d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -146,21 +146,31 @@ private[scheduler] class BlacklistTracker (
nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
}
+ private def killExecutor(exec: String, msg: String): Unit = {
+ allocationClient match {
+ case Some(a) =>
+ logInfo(msg)
+ a.killExecutors(Seq(exec), adjustTargetNumExecutors = false,
countFailures = false,
+ force = true)
+ case None =>
+ logInfo(s"Not attempting to kill blacklisted executor id $exec " +
+ s"since allocation client is not defined.")
+ }
+ }
+
private def killBlacklistedExecutor(exec: String): Unit = {
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
- allocationClient match {
- case Some(a) =>
- logInfo(s"Killing blacklisted executor id $exec " +
- s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
- a.killExecutors(Seq(exec), adjustTargetNumExecutors = false,
countFailures = false,
- force = true)
- case None =>
- logWarning(s"Not attempting to kill blacklisted executor id $exec " +
- s"since allocation client is not defined.")
- }
+ killExecutor(exec,
+ s"Killing blacklisted executor id $exec since
${config.BLACKLIST_KILL_ENABLED.key} is set.")
}
}
+ private[scheduler] def killBlacklistedIdleExecutor(exec: String): Unit = {
+ killExecutor(exec,
+ s"Killing blacklisted idle executor id $exec because of task
unschedulability and trying " +
+ "to acquire a new executor.")
+ }
+
private def killExecutorsOnBlacklistedNode(node: String): Unit = {
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
http://git-wip-us.apache.org/repos/asf/spark/blob/fdd3bace/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 4f870e85..61556ea 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -35,7 +35,7 @@ import org.apache.spark.rpc.RpcEndpoint
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
+import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils}
/**
* Schedules tasks for multiple types of clusters by acting through a
SchedulerBackend.
@@ -117,6 +117,11 @@ private[spark] class TaskSchedulerImpl(
protected val executorIdToHost = new HashMap[String, String]
+ private val abortTimer = new Timer(true)
+ private val clock = new SystemClock
+ // Exposed for testing
+ val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long]
+
// Listener object to pass upcalls into
var dagScheduler: DAGScheduler = null
@@ -415,9 +420,53 @@ private[spark] class TaskSchedulerImpl(
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
+
if (!launchedAnyTask) {
- taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+ taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach {
taskIndex =>
+ // If the taskSet is unschedulable we try to find an existing
idle blacklisted
+ // executor. If we cannot find one, we abort immediately. Else
we kill the idle
+ // executor and kick off an abortTimer which if it doesn't
schedule a task within the
+ // the timeout will abort the taskSet if we were unable to
schedule any task from the
+ // taskSet.
+ // Note 1: We keep track of schedulability on a per taskSet
basis rather than on a per
+ // task basis.
+ // Note 2: The taskSet can still be aborted when there are more
than one idle
+ // blacklisted executors and dynamic allocation is on. This can
happen when a killed
+ // idle executor isn't replaced in time by
ExecutorAllocationManager as it relies on
+ // pending tasks and doesn't kill executors on idle timeouts,
resulting in the abort
+ // timer to expire and abort the taskSet.
+ executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1))
match {
+ case Some ((executorId, _)) =>
+ if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
+ blacklistTrackerOpt.foreach(blt =>
blt.killBlacklistedIdleExecutor(executorId))
+
+ val timeout =
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
+ unschedulableTaskSetToExpiryTime(taskSet) =
clock.getTimeMillis() + timeout
+ logInfo(s"Waiting for $timeout ms for completely "
+ + s"blacklisted task to be schedulable again before
aborting $taskSet.")
+ abortTimer.schedule(
+ createUnschedulableTaskSetAbortTimer(taskSet,
taskIndex), timeout)
+ }
+ case None => // Abort Immediately
+ logInfo("Cannot schedule any task because of complete
blacklisting. No idle" +
+ s" executors can be found to kill. Aborting $taskSet." )
+ taskSet.abortSinceCompletelyBlacklisted(taskIndex)
+ }
+ }
+ } else {
+ // We want to defer killing any taskSets as long as we have a non
blacklisted executor
+ // which can be used to schedule a task from any active taskSets.
This ensures that the
+ // job can make progress.
+ // Note: It is theoretically possible that a taskSet never gets
scheduled on a
+ // non-blacklisted executor and the abort timer doesn't kick in
because of a constant
+ // submission of new TaskSets. See the PR for more details.
+ if (unschedulableTaskSetToExpiryTime.nonEmpty) {
+ logInfo("Clearing the expiry times for all unschedulable taskSets
as a task was " +
+ "recently scheduled.")
+ unschedulableTaskSetToExpiryTime.clear()
+ }
}
+
if (launchedAnyTask && taskSet.isBarrier) {
// Check whether the barrier tasks are partially launched.
// TODO SPARK-24818 handle the assert failure case (that can happen
when some locality
@@ -453,6 +502,23 @@ private[spark] class TaskSchedulerImpl(
return tasks
}
+ private def createUnschedulableTaskSetAbortTimer(
+ taskSet: TaskSetManager,
+ taskIndex: Int): TimerTask = {
+ new TimerTask() {
+ override def run() {
+ if (unschedulableTaskSetToExpiryTime.contains(taskSet) &&
+ unschedulableTaskSetToExpiryTime(taskSet) <=
clock.getTimeMillis()) {
+ logInfo("Cannot schedule any task because of complete blacklisting.
" +
+ s"Wait time for scheduling expired. Aborting $taskSet.")
+ taskSet.abortSinceCompletelyBlacklisted(taskIndex)
+ } else {
+ this.cancel()
+ }
+ }
+ }
+ }
+
/**
* Shuffle offers around to avoid always placing tasks on the same workers.
Exposed to allow
* overriding in tests, so it can be deterministic.
@@ -590,6 +656,7 @@ private[spark] class TaskSchedulerImpl(
barrierCoordinator.stop()
}
starvationTimer.cancel()
+ abortTimer.cancel()
}
override def defaultParallelism(): Int = backend.defaultParallelism()
http://git-wip-us.apache.org/repos/asf/spark/blob/fdd3bace/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index d5e85a1..6bf60dd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -623,8 +623,8 @@ private[spark] class TaskSetManager(
*
* It is possible that this taskset has become impossible to schedule
*anywhere* due to the
* blacklist. The most common scenario would be if there are fewer
executors than
- * spark.task.maxFailures. We need to detect this so we can fail the task
set, otherwise the job
- * will hang.
+ * spark.task.maxFailures. We need to detect this so we can avoid the job
from being hung.
+ * We try to acquire new executor/s by killing an existing idle blacklisted
executor.
*
* There's a tradeoff here: we could make sure all tasks in the task set are
schedulable, but that
* would add extra time to each iteration of the scheduling loop. Here, we
take the approach of
@@ -635,9 +635,9 @@ private[spark] class TaskSetManager(
* failures (this is because the method picks one unscheduled task, and then
iterates through each
* executor until it finds one that the task isn't blacklisted on).
*/
- private[scheduler] def abortIfCompletelyBlacklisted(
- hostToExecutors: HashMap[String, HashSet[String]]): Unit = {
- taskSetBlacklistHelperOpt.foreach { taskSetBlacklist =>
+ private[scheduler] def getCompletelyBlacklistedTaskIfAny(
+ hostToExecutors: HashMap[String, HashSet[String]]): Option[Int] = {
+ taskSetBlacklistHelperOpt.flatMap { taskSetBlacklist =>
val appBlacklist = blacklistTracker.get
// Only look for unschedulable tasks when at least one executor has
registered. Otherwise,
// task sets will be (unnecessarily) aborted in cases when no executors
have registered yet.
@@ -658,11 +658,11 @@ private[spark] class TaskSetManager(
}
}
- pendingTask.foreach { indexInTaskSet =>
+ pendingTask.find { indexInTaskSet =>
// try to find some executor this task can run on. Its possible
that some *other*
// task isn't schedulable anywhere, but we will discover that in
some later call,
// when that unschedulable task is the last task remaining.
- val blacklistedEverywhere = hostToExecutors.forall { case (host,
execsOnHost) =>
+ hostToExecutors.forall { case (host, execsOnHost) =>
// Check if the task can run on the node
val nodeBlacklisted =
appBlacklist.isNodeBlacklisted(host) ||
@@ -679,22 +679,27 @@ private[spark] class TaskSetManager(
}
}
}
- if (blacklistedEverywhere) {
- val partition = tasks(indexInTaskSet).partitionId
- abort(s"""
- |Aborting $taskSet because task $indexInTaskSet (partition
$partition)
- |cannot run anywhere due to node and executor blacklist.
- |Most recent failure:
- |${taskSetBlacklist.getLatestFailureReason}
- |
- |Blacklisting behavior can be configured via spark.blacklist.*.
- |""".stripMargin)
- }
}
+ } else {
+ None
}
}
}
+ private[scheduler] def abortSinceCompletelyBlacklisted(indexInTaskSet: Int):
Unit = {
+ taskSetBlacklistHelperOpt.foreach { taskSetBlacklist =>
+ val partition = tasks(indexInTaskSet).partitionId
+ abort(s"""
+ |Aborting $taskSet because task $indexInTaskSet (partition $partition)
+ |cannot run anywhere due to node and executor blacklist.
+ |Most recent failure:
+ |${taskSetBlacklist.getLatestFailureReason}
+ |
+ |Blacklisting behavior can be configured via spark.blacklist.*.
+ |""".stripMargin)
+ }
+ }
+
/**
* Marks the task as getting result and notifies the DAG Scheduler
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/fdd3bace/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
----------------------------------------------------------------------
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index fe22d70..29bb823 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -96,15 +96,16 @@ class BlacklistIntegrationSuite extends
SchedulerIntegrationSuite[MultiExecutorM
assertDataStructuresEmpty(noFailure = true)
}
- // Make sure that if we've failed on all executors, but haven't hit
task.maxFailures yet, the job
- // doesn't hang
+ // Make sure that if we've failed on all executors, but haven't hit
task.maxFailures yet, we try
+ // to acquire a new executor and if we aren't able to get one, the job
doesn't hang and we abort
testScheduler(
"SPARK-15865 Progress with fewer executors than maxTaskFailures",
extraConfs = Seq(
config.BLACKLIST_ENABLED.key -> "true",
"spark.testing.nHosts" -> "2",
"spark.testing.nExecutorsPerHost" -> "1",
- "spark.testing.nCoresPerExecutor" -> "1"
+ "spark.testing.nCoresPerExecutor" -> "1",
+ "spark.scheduler.blacklist.unschedulableTaskSetTimeout" -> "0s"
)
) {
def runBackend(): Unit = {
http://git-wip-us.apache.org/repos/asf/spark/blob/fdd3bace/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 9e1d13e..29172b4 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -20,10 +20,12 @@ package org.apache.spark.scheduler
import java.nio.ByteBuffer
import scala.collection.mutable.HashMap
+import scala.concurrent.duration._
import org.mockito.Matchers.{anyInt, anyObject, anyString, eq => meq}
import org.mockito.Mockito.{atLeast, atMost, never, spy, times, verify, when}
import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
import org.scalatest.mockito.MockitoSugar
import org.apache.spark._
@@ -40,7 +42,7 @@ class FakeSchedulerBackend extends SchedulerBackend {
}
class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with
BeforeAndAfterEach
- with Logging with MockitoSugar {
+ with Logging with MockitoSugar with Eventually {
var failedTaskSetException: Option[Throwable] = None
var failedTaskSetReason: String = null
@@ -82,10 +84,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
setupHelper()
}
- def setupSchedulerWithMockTaskSetBlacklist(): TaskSchedulerImpl = {
+ def setupSchedulerWithMockTaskSetBlacklist(confs: (String, String)*):
TaskSchedulerImpl = {
blacklist = mock[BlacklistTracker]
val conf = new
SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
conf.set(config.BLACKLIST_ENABLED, true)
+ confs.foreach { case (k, v) => conf.set(k, v) }
+
sc = new SparkContext(conf)
taskScheduler =
new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) {
@@ -466,7 +470,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
}
}
- test("abort stage when all executors are blacklisted") {
+ test("abort stage when all executors are blacklisted and we cannot acquire
new executor") {
taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
val taskSet = FakeTask.createTaskSet(numTasks = 10, stageAttemptId = 0)
taskScheduler.submitTasks(taskSet)
@@ -503,6 +507,185 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
verify(tsm).abort(anyString(), anyObject())
}
+ test("SPARK-22148 abort timer should kick in when task is completely
blacklisted & no new " +
+ "executor can be acquired") {
+ // set the abort timer to fail immediately
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+ config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
+
+ // We have only 1 task remaining with 1 executor
+ val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet)
+ val tsm = stageToMockTaskSetManager(0)
+
+ // submit an offer with one executor
+ val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten
+
+ // Fail the running task
+ val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get
+ taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED,
ByteBuffer.allocate(0))
+ // we explicitly call the handleFailedTask method here to avoid adding a
sleep in the test suite
+ // Reason being - handleFailedTask is run by an executor service and there
is a momentary delay
+ // before it is launched and this fails the assertion check.
+ tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason)
+ when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+ "executor0", failedTask.index)).thenReturn(true)
+
+ // make an offer on the blacklisted executor. We won't schedule anything,
and set the abort
+ // timer to kick in immediately
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten.size === 0)
+ // Wait for the abort timer to kick in. Even though we configure the
timeout to be 0, there is a
+ // slight delay as the abort timer is launched in a separate thread.
+ eventually(timeout(500.milliseconds)) {
+ assert(tsm.isZombie)
+ }
+ }
+
+ test("SPARK-22148 try to acquire a new executor when task is unschedulable
with 1 executor") {
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+ config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10")
+
+ // We have only 1 task remaining with 1 executor
+ val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet)
+ val tsm = stageToMockTaskSetManager(0)
+
+ // submit an offer with one executor
+ val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten
+
+ // Fail the running task
+ val failedTask = firstTaskAttempts.head
+ taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED,
ByteBuffer.allocate(0))
+ // we explicitly call the handleFailedTask method here to avoid adding a
sleep in the test suite
+ // Reason being - handleFailedTask is run by an executor service and there
is a momentary delay
+ // before it is launched and this fails the assertion check.
+ tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason)
+ when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+ "executor0", failedTask.index)).thenReturn(true)
+
+ // make an offer on the blacklisted executor. We won't schedule anything,
and set the abort
+ // timer to expire if no new executors could be acquired. We kill the
existing idle blacklisted
+ // executor and try to acquire a new one.
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten.size === 0)
+ assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm))
+ assert(!tsm.isZombie)
+
+ // Offer a new executor which should be accepted
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor1", "host0", 1)
+ )).flatten.size === 1)
+ assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty)
+ assert(!tsm.isZombie)
+ }
+
+ // This is to test a scenario where we have two taskSets completely
blacklisted and on acquiring
+ // a new executor we don't want the abort timer for the second taskSet to
expire and abort the job
+ test("SPARK-22148 abort timer should clear unschedulableTaskSetToExpiryTime
for all TaskSets") {
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+
+ // We have 2 taskSets with 1 task remaining in each with 1 executor
completely blacklisted
+ val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0,
stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet1)
+ val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 1,
stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet2)
+ val tsm = stageToMockTaskSetManager(0)
+
+ // submit an offer with one executor
+ val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten
+
+ assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty)
+
+ // Fail the running task
+ val failedTask = firstTaskAttempts.head
+ taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED,
ByteBuffer.allocate(0))
+ tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason)
+ when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+ "executor0", failedTask.index)).thenReturn(true)
+
+ // make an offer. We will schedule the task from the second taskSet. Since
a task was scheduled
+ // we do not kick off the abort timer for taskSet1
+ val secondTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten
+
+ assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty)
+
+ val tsm2 = stageToMockTaskSetManager(1)
+ val failedTask2 = secondTaskAttempts.head
+ taskScheduler.statusUpdate(failedTask2.taskId, TaskState.FAILED,
ByteBuffer.allocate(0))
+ tsm2.handleFailedTask(failedTask2.taskId, TaskState.FAILED, UnknownReason)
+ when(tsm2.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+ "executor0", failedTask2.index)).thenReturn(true)
+
+ // make an offer on the blacklisted executor. We won't schedule anything,
and set the abort
+ // timer for taskSet1 and taskSet2
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten.size === 0)
+ assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm))
+ assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm2))
+ assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 2)
+
+ // Offer a new executor which should be accepted
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor1", "host1", 1)
+ )).flatten.size === 1)
+
+ // Check if all the taskSets are cleared
+ assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty)
+
+ assert(!tsm.isZombie)
+ }
+
+ // this test is to check that we don't abort a taskSet which is not being
scheduled on other
+ // executors as it is waiting on locality timeout and not being aborted
because it is still not
+ // completely blacklisted.
+ test("SPARK-22148 Ensure we don't abort the taskSet if we haven't been
completely blacklisted") {
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+ config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0",
+ // This is to avoid any potential flakiness in the test because of large
pauses in jenkins
+ config.LOCALITY_WAIT.key -> "30s"
+ )
+
+ val preferredLocation = Seq(ExecutorCacheTaskLocation("host0",
"executor0"))
+ val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0,
stageAttemptId = 0,
+ preferredLocation)
+ taskScheduler.submitTasks(taskSet1)
+
+ val tsm = stageToMockTaskSetManager(0)
+
+ // submit an offer with one executor
+ var taskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten
+
+ // Fail the running task
+ val failedTask = taskAttempts.head
+ taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED,
ByteBuffer.allocate(0))
+ tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason)
+ when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+ "executor0", failedTask.index)).thenReturn(true)
+
+ // make an offer but we won't schedule anything yet as scheduler locality
is still PROCESS_LOCAL
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor1", "host0", 1)
+ )).flatten.isEmpty)
+
+ assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty)
+
+ assert(!tsm.isZombie)
+ }
+
/**
* Helper for performance tests. Takes the explicitly blacklisted nodes and
executors; verifies
* that the blacklists are used efficiently to ensure scheduling is not
O(numPendingTasks).
http://git-wip-us.apache.org/repos/asf/spark/blob/fdd3bace/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 11ee7a9..f8937b0 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1611,6 +1611,14 @@ Apart from these, the following properties are also
available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.scheduler.blacklist.unschedulableTaskSetTimeout</code></td>
+ <td>120s</td>
+ <td>
+ The timeout in seconds to wait to acquire a new executor and schedule a
task before aborting a
+ TaskSet which is unschedulable because of being completely blacklisted.
+ </td>
+</tr>
+<tr>
<td><code>spark.blacklist.enabled</code></td>
<td>
false
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]