This is an automated email from the ASF dual-hosted git repository.
tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e7fb67c [SPARK-31418][SCHEDULER] Request more executors in case of
dynamic allocation is enabled and a task becomes unschedulable due to spark's
blacklisting feature
e7fb67c is described below
commit e7fb67cd880511452b94c2077429868e72998c05
Author: Venkata krishnan Sowrirajan <[email protected]>
AuthorDate: Thu Jul 23 12:33:22 2020 -0500
[SPARK-31418][SCHEDULER] Request more executors in case of dynamic
allocation is enabled and a task becomes unschedulable due to spark's
blacklisting feature
### What changes were proposed in this pull request?
In this change, when dynamic allocation is enabled instead of aborting
immediately when there is an unschedulable taskset due to blacklisting, pass an
event saying `SparkListenerUnschedulableTaskSetAdded` which will be handled by
`ExecutorAllocationManager` and request more executors needed to schedule the
unschedulable blacklisted tasks. Once the event is sent, we start the
abortTimer similar to [SPARK-22148][SPARK-15815] to abort in the case when no
new executors launched either due [...]
### Why are the changes needed?
This is an improvement. In the case when dynamic allocation is enabled,
this would request more executors to schedule the unschedulable tasks instead
of aborting the stage without even retrying upto spark.task.maxFailures times
(in some cases not retrying at all). This is a potential issue with respect to
Spark's Fault tolerance.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Added unit tests both in ExecutorAllocationManagerSuite and
TaskSchedulerImplSuite
Closes #28287 from venkata91/SPARK-31418.
Authored-by: Venkata krishnan Sowrirajan <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
---
.../org/apache/spark/SparkFirehoseListener.java | 10 ++
.../apache/spark/ExecutorAllocationManager.scala | 55 ++++++-
.../org/apache/spark/scheduler/DAGScheduler.scala | 38 +++++
.../apache/spark/scheduler/DAGSchedulerEvent.scala | 8 +
.../org/apache/spark/scheduler/SparkListener.scala | 30 ++++
.../apache/spark/scheduler/SparkListenerBus.scala | 4 +
.../apache/spark/scheduler/TaskSchedulerImpl.scala | 55 +++++--
.../spark/ExecutorAllocationManagerSuite.scala | 175 ++++++++++++++++++++-
.../spark/scheduler/TaskSchedulerImplSuite.scala | 36 +++++
9 files changed, 392 insertions(+), 19 deletions(-)
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 579e7ff..c0e72b5 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -162,6 +162,16 @@ public class SparkFirehoseListener implements
SparkListenerInterface {
onEvent(speculativeTask);
}
+ public void onUnschedulableTaskSetAdded(
+ SparkListenerUnschedulableTaskSetAdded unschedulableTaskSetAdded) {
+ onEvent(unschedulableTaskSetAdded);
+ }
+
+ public void onUnschedulableTaskSetRemoved(
+ SparkListenerUnschedulableTaskSetRemoved unschedulableTaskSetRemoved) {
+ onEvent(unschedulableTaskSetRemoved);
+ }
+
@Override
public void onResourceProfileAdded(SparkListenerResourceProfileAdded event) {
onEvent(event);
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 620a6fe..85409d5 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -281,6 +281,7 @@ private[spark] class ExecutorAllocationManager(
private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
val pending = listener.totalPendingTasksPerResourceProfile(rpId)
val pendingSpeculative =
listener.pendingSpeculativeTasksPerResourceProfile(rpId)
+ val unschedulableTaskSets =
listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId)
val running = listener.totalRunningTasksPerResourceProfile(rpId)
val numRunningOrPendingTasks = pending + running
val rp = resourceProfileManager.resourceProfileFromId(rpId)
@@ -289,13 +290,27 @@ private[spark] class ExecutorAllocationManager(
s" tasksperexecutor: $tasksPerExecutor")
val maxNeeded = math.ceil(numRunningOrPendingTasks *
executorAllocationRatio /
tasksPerExecutor).toInt
- if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
+
+ val maxNeededWithSpeculationLocalityOffset =
+ if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
// If we have pending speculative tasks and only need a single executor,
allocate one more
// to satisfy the locality requirements of speculation
maxNeeded + 1
} else {
maxNeeded
}
+
+ if (unschedulableTaskSets > 0) {
+ // Request additional executors to account for task sets having tasks
that are unschedulable
+ // due to blacklisting when the active executor count has already
reached the max needed
+ // which we would normally get.
+ val maxNeededForUnschedulables = math.ceil(unschedulableTaskSets *
executorAllocationRatio /
+ tasksPerExecutor).toInt
+ math.max(maxNeededWithSpeculationLocalityOffset,
+ executorMonitor.executorCountWithResourceProfile(rpId) +
maxNeededForUnschedulables)
+ } else {
+ maxNeededWithSpeculationLocalityOffset
+ }
}
private def totalRunningTasksPerResourceProfile(id: Int): Int = synchronized
{
@@ -622,6 +637,12 @@ private[spark] class ExecutorAllocationManager(
private val resourceProfileIdToStageAttempt =
new mutable.HashMap[Int, mutable.Set[StageAttempt]]
+ // Keep track of unschedulable task sets due to blacklisting. This is a
Set of StageAttempt's
+ // because we'll only take the last unschedulable task in a taskset
although there can be more.
+ // This is done in order to avoid costly loops in the scheduling.
+ // Check TaskSetManager#getCompletelyBlacklistedTaskIfAny for more details.
+ private val unschedulableTaskSets = new mutable.HashSet[StageAttempt]
+
// stageAttempt to tuple (the number of task with locality preferences, a
map where each pair
// is a node and the number of tasks that would like to be scheduled on
that node, and
// the resource profile id) map,
@@ -789,6 +810,28 @@ private[spark] class ExecutorAllocationManager(
}
}
+ override def onUnschedulableTaskSetAdded(
+ unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded):
Unit = {
+ val stageId = unschedulableTaskSetAdded.stageId
+ val stageAttemptId = unschedulableTaskSetAdded.stageAttemptId
+ val stageAttempt = StageAttempt(stageId, stageAttemptId)
+ allocationManager.synchronized {
+ unschedulableTaskSets.add(stageAttempt)
+ allocationManager.onSchedulerBacklogged()
+ }
+ }
+
+ override def onUnschedulableTaskSetRemoved(
+ unschedulableTaskSetRemoved:
SparkListenerUnschedulableTaskSetRemoved): Unit = {
+ val stageId = unschedulableTaskSetRemoved.stageId
+ val stageAttemptId = unschedulableTaskSetRemoved.stageAttemptId
+ val stageAttempt = StageAttempt(stageId, stageAttemptId)
+ allocationManager.synchronized {
+ // Clear unschedulableTaskSets since atleast one task becomes
schedulable now
+ unschedulableTaskSets.remove(stageAttempt)
+ }
+ }
+
/**
* An estimate of the total number of pending tasks remaining for
currently running stages. Does
* not account for tasks which may have failed and been resubmitted.
@@ -829,6 +872,16 @@ private[spark] class ExecutorAllocationManager(
numTotalTasks - numRunning
}
+ /**
+ * Currently we only know when a task set has an unschedulable task, we
don't know
+ * the exact number and since the allocation manager isn't tied closely
with the scheduler,
+ * we use the number of tasks sets that are unschedulable as a heuristic
to add more executors.
+ */
+ def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = {
+ val attempts = resourceProfileIdToStageAttempt.getOrElse(rp,
Set.empty).toSeq
+ attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size
+ }
+
def hasPendingTasks: Boolean = {
hasPendingSpeculativeTasks || hasPendingRegularTasks
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 73c95d1..2503ae0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -332,6 +332,26 @@ private[spark] class DAGScheduler(
eventProcessLoop.post(SpeculativeTaskSubmitted(task))
}
+ /**
+ * Called by the TaskSetManager when a taskset becomes unschedulable due to
blacklisting and
+ * dynamic allocation is enabled.
+ */
+ def unschedulableTaskSetAdded(
+ stageId: Int,
+ stageAttemptId: Int): Unit = {
+ eventProcessLoop.post(UnschedulableTaskSetAdded(stageId, stageAttemptId))
+ }
+
+ /**
+ * Called by the TaskSetManager when an unschedulable taskset becomes
schedulable and dynamic
+ * allocation is enabled.
+ */
+ def unschedulableTaskSetRemoved(
+ stageId: Int,
+ stageAttemptId: Int): Unit = {
+ eventProcessLoop.post(UnschedulableTaskSetRemoved(stageId, stageAttemptId))
+ }
+
private[scheduler]
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] =
cacheLocs.synchronized {
// Note: this doesn't use `getOrElse()` because this method is called
O(num tasks) times
@@ -1035,6 +1055,18 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId,
task.stageAttemptId))
}
+ private[scheduler] def handleUnschedulableTaskSetAdded(
+ stageId: Int,
+ stageAttemptId: Int): Unit = {
+ listenerBus.post(SparkListenerUnschedulableTaskSetAdded(stageId,
stageAttemptId))
+ }
+
+ private[scheduler] def handleUnschedulableTaskSetRemoved(
+ stageId: Int,
+ stageAttemptId: Int): Unit = {
+ listenerBus.post(SparkListenerUnschedulableTaskSetRemoved(stageId,
stageAttemptId))
+ }
+
private[scheduler] def handleTaskSetFailed(
taskSet: TaskSet,
reason: String,
@@ -2321,6 +2353,12 @@ private[scheduler] class
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case SpeculativeTaskSubmitted(task) =>
dagScheduler.handleSpeculativeTaskSubmitted(task)
+ case UnschedulableTaskSetAdded(stageId, stageAttemptId) =>
+ dagScheduler.handleUnschedulableTaskSetAdded(stageId, stageAttemptId)
+
+ case UnschedulableTaskSetRemoved(stageId, stageAttemptId) =>
+ dagScheduler.handleUnschedulableTaskSetRemoved(stageId, stageAttemptId)
+
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 78d4583..d226fe8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -97,3 +97,11 @@ private[scheduler] case object ResubmitFailedStages extends
DAGSchedulerEvent
private[scheduler]
case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent
+private[scheduler]
+case class UnschedulableTaskSetAdded(stageId: Int, stageAttemptId: Int)
+ extends DAGSchedulerEvent
+
+private[scheduler]
+case class UnschedulableTaskSetRemoved(stageId: Int, stageAttemptId: Int)
+ extends DAGSchedulerEvent
+
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 62d54f3..8119215 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -158,6 +158,16 @@ case class SparkListenerNodeUnblacklisted(time: Long,
hostId: String)
extends SparkListenerEvent
@DeveloperApi
+case class SparkListenerUnschedulableTaskSetAdded(
+ stageId: Int,
+ stageAttemptId: Int) extends SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerUnschedulableTaskSetRemoved(
+ stageId: Int,
+ stageAttemptId: Int) extends SparkListenerEvent
+
+@DeveloperApi
case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo)
extends SparkListenerEvent
/**
@@ -340,6 +350,20 @@ private[spark] trait SparkListenerInterface {
def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted):
Unit
/**
+ * Called when a taskset becomes unschedulable due to blacklisting and
dynamic allocation
+ * is enabled.
+ */
+ def onUnschedulableTaskSetAdded(
+ unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit
+
+ /**
+ * Called when an unschedulable taskset becomes schedulable and dynamic
allocation
+ * is enabled.
+ */
+ def onUnschedulableTaskSetRemoved(
+ unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved):
Unit
+
+ /**
* Called when the driver receives a block update info.
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
@@ -425,6 +449,12 @@ abstract class SparkListener extends
SparkListenerInterface {
override def onNodeUnblacklisted(
nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }
+ override def onUnschedulableTaskSetAdded(
+ unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit
= { }
+
+ override def onUnschedulableTaskSetRemoved(
+ unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved):
Unit = { }
+
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit =
{ }
override def onSpeculativeTaskSubmitted(
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 3d316c9..13e65f4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -79,6 +79,10 @@ private[spark] trait SparkListenerBus
listener.onBlockUpdated(blockUpdated)
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
+ case unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded =>
+ listener.onUnschedulableTaskSetAdded(unschedulableTaskSetAdded)
+ case unschedulableTaskSetRemoved:
SparkListenerUnschedulableTaskSetRemoved =>
+ listener.onUnschedulableTaskSetRemoved(unschedulableTaskSetRemoved)
case resourceProfileAdded: SparkListenerResourceProfileAdded =>
listener.onResourceProfileAdded(resourceProfileAdded)
case _ => listener.onOtherEvent(event)
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 28e138e..510318a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -637,10 +637,9 @@ private[spark] class TaskSchedulerImpl(
if (!launchedAnyTask) {
taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach {
taskIndex =>
// If the taskSet is unschedulable we try to find an existing
idle blacklisted
- // executor. If we cannot find one, we abort immediately. Else
we kill the idle
- // executor and kick off an abortTimer which if it doesn't
schedule a task within the
- // the timeout will abort the taskSet if we were unable to
schedule any task from the
- // taskSet.
+ // executor and kill the idle executor and kick off an
abortTimer which if it doesn't
+ // schedule a task within the the timeout will abort the taskSet
if we were unable to
+ // schedule any task from the taskSet.
// Note 1: We keep track of schedulability on a per taskSet
basis rather than on a per
// task basis.
// Note 2: The taskSet can still be aborted when there are more
than one idle
@@ -648,22 +647,33 @@ private[spark] class TaskSchedulerImpl(
// idle executor isn't replaced in time by
ExecutorAllocationManager as it relies on
// pending tasks and doesn't kill executors on idle timeouts,
resulting in the abort
// timer to expire and abort the taskSet.
+ //
+ // If there are no idle executors and dynamic allocation is
enabled, then we would
+ // notify ExecutorAllocationManager to allocate more executors
to schedule the
+ // unschedulable tasks else we will abort immediately.
executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1))
match {
case Some ((executorId, _)) =>
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
blacklistTrackerOpt.foreach(blt =>
blt.killBlacklistedIdleExecutor(executorId))
-
- val timeout =
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
- unschedulableTaskSetToExpiryTime(taskSet) =
clock.getTimeMillis() + timeout
- logInfo(s"Waiting for $timeout ms for completely "
- + s"blacklisted task to be schedulable again before
aborting $taskSet.")
- abortTimer.schedule(
- createUnschedulableTaskSetAbortTimer(taskSet,
taskIndex), timeout)
+
updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex)
+ }
+ case None =>
+ // Notify ExecutorAllocationManager about the unschedulable
task set,
+ // in order to provision more executors to make them
schedulable
+ if (Utils.isDynamicAllocationEnabled(conf)) {
+ if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
+ logInfo(s"Notifying ExecutorAllocationManager to
allocate more executors to" +
+ s" schedule the unschedulable task before aborting
$taskSet.")
+
dagScheduler.unschedulableTaskSetAdded(taskSet.taskSet.stageId,
+ taskSet.taskSet.stageAttemptId)
+
updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex)
+ }
+ } else {
+ // Abort Immediately
+ logInfo("Cannot schedule any task because of complete
blacklisting. No idle" +
+ s" executors can be found to kill. Aborting $taskSet.")
+ taskSet.abortSinceCompletelyBlacklisted(taskIndex)
}
- case None => // Abort Immediately
- logInfo("Cannot schedule any task because of complete
blacklisting. No idle" +
- s" executors can be found to kill. Aborting $taskSet." )
- taskSet.abortSinceCompletelyBlacklisted(taskIndex)
}
}
} else {
@@ -676,6 +686,10 @@ private[spark] class TaskSchedulerImpl(
if (unschedulableTaskSetToExpiryTime.nonEmpty) {
logInfo("Clearing the expiry times for all unschedulable taskSets
as a task was " +
"recently scheduled.")
+ // Notify ExecutorAllocationManager as well as other subscribers
that a task now
+ // recently becomes schedulable
+ dagScheduler.unschedulableTaskSetRemoved(taskSet.taskSet.stageId,
+ taskSet.taskSet.stageAttemptId)
unschedulableTaskSetToExpiryTime.clear()
}
}
@@ -722,6 +736,17 @@ private[spark] class TaskSchedulerImpl(
return tasks.map(_.toSeq)
}
+ private def updateUnschedulableTaskSetTimeoutAndStartAbortTimer(
+ taskSet: TaskSetManager,
+ taskIndex: Int): Unit = {
+ val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
+ unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout
+ logInfo(s"Waiting for $timeout ms for completely " +
+ s"blacklisted task to be schedulable again before aborting $taskSet.")
+ abortTimer.schedule(
+ createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)
+ }
+
private def createUnschedulableTaskSetAbortTimer(
taskSet: TaskSetManager,
taskIndex: Int): TimerTask = {
diff --git
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 8037f4a..ea6e010 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -21,15 +21,15 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable
-import org.mockito.ArgumentMatchers.{any, eq => meq}
-import org.mockito.Mockito.{mock, never, times, verify, when}
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito._
import org.scalatest.PrivateMethodTester
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile,
ResourceProfileBuilder, ResourceProfileManager, TaskResourceRequests}
+import org.apache.spark.resource._
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -501,6 +501,175 @@ class ExecutorAllocationManagerSuite extends
SparkFunSuite {
assert(numExecutorsToAddForDefaultProfile(manager) === 1)
}
+ test("SPARK-31418: one stage being unschedulable") {
+ val clock = new ManualClock()
+ val conf = createConf(0, 5, 0).set(config.EXECUTOR_CORES, 2)
+ val manager = createManager(conf, clock = clock)
+ val updatesNeeded =
+ new mutable.HashMap[ResourceProfile,
ExecutorAllocationManager.TargetNumUpdates]
+
+ post(SparkListenerStageSubmitted(createStageInfo(0, 2)))
+
+ assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+ doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+ assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+
+ onExecutorAddedDefaultProfile(manager, "0")
+ val t1 = createTaskInfo(0, 0, executorId = s"0")
+ val t2 = createTaskInfo(1, 1, executorId = s"0")
+ post(SparkListenerTaskStart(0, 0, t1))
+ post(SparkListenerTaskStart(0, 0, t2))
+
+ assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
+ assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) ==
1)
+
+ // Stage 0 becomes unschedulable due to blacklisting
+ post(SparkListenerUnschedulableTaskSetAdded(0, 0))
+ clock.advance(1000)
+ manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+ // Assert that we are getting additional executor to schedule
unschedulable tasks
+ assert(numExecutorsTarget(manager, defaultProfile.id) === 2)
+ assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) ==
2)
+
+ // Add a new executor
+ onExecutorAddedDefaultProfile(manager, "1")
+ // Now once the task becomes schedulable, clear the unschedulableTaskSets
+ post(SparkListenerUnschedulableTaskSetRemoved(0, 0))
+ clock.advance(1000)
+ manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+ assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
+ assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) ==
1)
+ }
+
+ test("SPARK-31418: multiple stages being unschedulable") {
+ val clock = new ManualClock()
+ val conf = createConf(0, 10, 0).set(config.EXECUTOR_CORES, 2)
+ val manager = createManager(conf, clock = clock)
+ val updatesNeeded =
+ new mutable.HashMap[ResourceProfile,
ExecutorAllocationManager.TargetNumUpdates]
+
+ post(SparkListenerStageSubmitted(createStageInfo(0, 2)))
+ post(SparkListenerStageSubmitted(createStageInfo(1, 2)))
+ post(SparkListenerStageSubmitted(createStageInfo(2, 2)))
+
+ assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+ doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+ assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+ doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+ assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+
+ // Add necessary executors
+ (0 to 2).foreach(execId => onExecutorAddedDefaultProfile(manager,
execId.toString))
+
+ // Start all the tasks
+ (0 to 2).foreach {
+ i =>
+ val t1Info = createTaskInfo(0, (i * 2) + 1, executorId = s"${i / 2}")
+ val t2Info = createTaskInfo(1, (i * 2) + 2, executorId = s"${i / 2}")
+ post(SparkListenerTaskStart(i, 0, t1Info))
+ post(SparkListenerTaskStart(i, 0, t2Info))
+ }
+ assert(numExecutorsTarget(manager, defaultProfile.id) === 3)
+ assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) ==
3)
+
+ // Complete the stage 0 tasks.
+ val t1Info = createTaskInfo(0, 0, executorId = s"0")
+ val t2Info = createTaskInfo(1, 1, executorId = s"0")
+ post(SparkListenerTaskEnd(0, 0, null, Success, t1Info, new
ExecutorMetrics, null))
+ post(SparkListenerTaskEnd(0, 0, null, Success, t2Info, new
ExecutorMetrics, null))
+ post(SparkListenerStageCompleted(createStageInfo(0, 2)))
+
+ // Stage 1 and 2 becomes unschedulable now due to blacklisting
+ post(SparkListenerUnschedulableTaskSetAdded(1, 0))
+ post(SparkListenerUnschedulableTaskSetAdded(2, 0))
+
+ clock.advance(1000)
+ manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+ // Assert that we are getting additional executor to schedule
unschedulable tasks
+ assert(numExecutorsTarget(manager, defaultProfile.id) === 4)
+ assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) ==
4)
+
+ // Add a new executor
+ onExecutorAddedDefaultProfile(manager, "3")
+
+ // Now once the task becomes schedulable, clear the unschedulableTaskSets
+ post(SparkListenerUnschedulableTaskSetRemoved(1, 0))
+ clock.advance(1000)
+ manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+ assert(numExecutorsTarget(manager, defaultProfile.id) === 4)
+ assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) ==
5)
+ }
+
+ test("SPARK-31418: remove executors after unschedulable tasks end") {
+ val clock = new ManualClock()
+ val stage = createStageInfo(0, 10)
+ val conf = createConf(0, 6, 0).set(config.EXECUTOR_CORES, 2)
+ val manager = createManager(conf, clock = clock)
+ val updatesNeeded =
+ new mutable.HashMap[ResourceProfile,
ExecutorAllocationManager.TargetNumUpdates]
+
+ post(SparkListenerStageSubmitted(stage))
+ assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+ doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+ assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+ doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+ assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+ doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+ assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+ doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+
+ (0 to 4).foreach(execId => onExecutorAddedDefaultProfile(manager,
execId.toString))
+ (0 to 9).map { i => createTaskInfo(i, i, executorId = s"${i / 2}")
}.foreach {
+ info => post(SparkListenerTaskStart(0, 0, info))
+ }
+ assert(numExecutorsTarget(manager, defaultProfile.id) === 5)
+ assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) ==
5)
+
+ // 8 tasks (0 - 7) finished
+ (0 to 7).map { i => createTaskInfo(i, i, executorId = s"${i / 2}")
}.foreach {
+ info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new
ExecutorMetrics, null))
+ }
+ clock.advance(1000)
+ manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+ assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
+ assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) ==
1)
+ (0 to 3).foreach { i => assert(removeExecutorDefaultProfile(manager,
i.toString)) }
+ (0 to 3).foreach { i => onExecutorRemoved(manager, i.toString) }
+
+ // Now due to blacklisting, the task becomes unschedulable
+ post(SparkListenerUnschedulableTaskSetAdded(0, 0))
+ clock.advance(1000)
+ manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+ assert(numExecutorsTarget(manager, defaultProfile.id) === 2)
+ assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) ==
2)
+
+ // New executor got added
+ onExecutorAddedDefaultProfile(manager, "5")
+
+ // Now once the task becomes schedulable, clear the unschedulableTaskSets
+ post(SparkListenerUnschedulableTaskSetRemoved(0, 0))
+ clock.advance(1000)
+ manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+ assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
+ assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) ==
1)
+ post(SparkListenerTaskEnd(0, 0, null, Success,
+ createTaskInfo(9, 9, "4"), new ExecutorMetrics, null))
+ // Unschedulable task successfully ran on the new executor provisioned
+ post(SparkListenerTaskEnd(0, 0, null, Success,
+ createTaskInfo(8, 8, "5"), new ExecutorMetrics, null))
+ clock.advance(1000)
+ manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+ post(SparkListenerStageCompleted(stage))
+ clock.advance(1000)
+ manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+ assert(numExecutorsTarget(manager, defaultProfile.id) === 0)
+ assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) ==
0)
+ assert(removeExecutorDefaultProfile(manager, "4"))
+ onExecutorRemoved(manager, "4")
+ assert(removeExecutorDefaultProfile(manager, "5"))
+ onExecutorRemoved(manager, "5")
+ }
+
test("SPARK-30511 remove executors when speculative tasks end") {
val clock = new ManualClock()
val stage = createStageInfo(0, 40)
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index e43be60..9ca3ce9 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -1000,6 +1000,42 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
assert(!tsm.isZombie)
}
+ test("SPARK-31418 abort timer should kick in when task is completely
blacklisted &" +
+ "allocation manager could not acquire a new executor before the timeout") {
+ // set the abort timer to fail immediately
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+ config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0",
+ config.DYN_ALLOCATION_ENABLED.key -> "true")
+
+ // We have 2 tasks remaining with 1 executor
+ val taskSet = FakeTask.createTaskSet(numTasks = 2)
+ taskScheduler.submitTasks(taskSet)
+ val tsm = stageToMockTaskSetManager(0)
+
+ // submit an offer with one executor
+ taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("executor0", "host0",
2))).flatten
+
+ // Fail the running task
+ failTask(0, TaskState.FAILED, UnknownReason, tsm)
+ when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+ "executor0", 0)).thenReturn(true)
+
+ // If the executor is busy, then dynamic allocation should kick in and try
+ // to acquire additional executors to schedule the blacklisted task
+ assert(taskScheduler.isExecutorBusy("executor0"))
+
+ // make an offer on the blacklisted executor. We won't schedule anything,
and set the abort
+ // timer to kick in immediately
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten.size === 0)
+ // Wait for the abort timer to kick in. Even though we configure the
timeout to be 0, there is a
+ // slight delay as the abort timer is launched in a separate thread.
+ eventually(timeout(500.milliseconds)) {
+ assert(tsm.isZombie)
+ }
+ }
+
/**
* Helper for performance tests. Takes the explicitly blacklisted nodes and
executors; verifies
* that the blacklists are used efficiently to ensure scheduling is not
O(numPendingTasks).
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]