[
https://issues.apache.org/jira/browse/SPARK-33418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
dingbei updated SPARK-33418:
----------------------------
Attachment: (was: 4_2.png)
> TaskSchedulerImpl: Check pending tasks in advance when resource offers
> ----------------------------------------------------------------------
>
> Key: SPARK-33418
> URL: https://issues.apache.org/jira/browse/SPARK-33418
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 3.0.1
> Reporter: dingbei
> Priority: Major
> Attachments: 1.png, 2.png, 3_1.png, 3_2.png, 3_3.png, 3_4.png, 6_2.png
>
>
> It begins with the needs to start a lot of spark streaming receivers(custom
> receivers) . *The launch time gets super long when it comes to more than 300
> receivers.*
> *Tests preparation*
> environment: Yarn
> spark.executor.cores : 2
> spark.executor.instances : fist time 200, then 500
> *Tests and data*
> At first, we set the number of executors to 200 which means to start 200
> receivers and everything goes well. It takes about 50s to launch all
> receivers.({color:#ff0000}pic 1{color})
> Then we set the number of executors to 500 which means to start 500
> receivers. The launch time became around 5 mins.({color:#ff0000}pic 2{color})
> *Dig into souce code*
> I use Thread dump to check which methods takes relatively long
> time.({color:#ff0000}pic 3{color}) Then I add logs between these methods. At
> last I find that the loop in
> {color:#00875a}TaskSchedulerImpl.resourceOffers{color} takes up most
> percentage of the duration({color:#de350b}red color{color}).
> {color:#00875a}org.apache.spark.scheduler.TaskSchedulerImpl{color}
>
> {code:java}
> def resourceOffers(offers: IndexedSeq[WorkerOffer]):
> Seq[Seq[TaskDescription]] = synchronized {
> // Mark each slave as alive and remember its hostname
> // Also track if new executor is added
> var newExecAvail = false
> for (o <- offers) {
> if (!hostToExecutors.contains(o.host)) {
> hostToExecutors(o.host) = new HashSet[String]()
> }
> if (!executorIdToRunningTaskIds.contains(o.executorId)) {
> hostToExecutors(o.host) += o.executorId
> executorAdded(o.executorId, o.host)
> executorIdToHost(o.executorId) = o.host
> executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
> newExecAvail = true
> }
> }
> val hosts = offers.map(_.host).distinct
> for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) {
> hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host
> }
> // Before making any offers, remove any nodes from the blacklist whose
> blacklist has expired. Do
> // this here to avoid a separate thread and added synchronization overhead,
> and also because
> // updating the blacklist is only relevant when task offers are being made.
> blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
> val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
> offers.filter { offer =>
> !blacklistTracker.isNodeBlacklisted(offer.host) &&
> !blacklistTracker.isExecutorBlacklisted(offer.executorId)
> }
> }.getOrElse(offers)
> val shuffledOffers = shuffleOffers(filteredOffers)
> // Build a list of tasks to assign to each worker.
> val tasks = shuffledOffers.map(o => new
> ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
> val availableResources = shuffledOffers.map(_.resources).toArray
> val availableCpus = shuffledOffers.map(o => o.cores).toArray
> val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
> for (taskSet <- sortedTaskSets) {
> logDebug("parentName: %s, name: %s, runningTasks: %s".format(
> taskSet.parent.name, taskSet.name, taskSet.runningTasks))
> if (newExecAvail) {
> taskSet.executorAdded()
> }
> }
> // Take each TaskSet in our scheduling order, and then offer it each node
> in increasing order
> // of locality levels so that it gets a chance to launch local tasks on all
> of them.
> // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF,
> RACK_LOCAL, ANY
> for (taskSet <- sortedTaskSets) {
> // we only need to calculate available slots if using barrier scheduling,
> otherwise the
> // value is -1
> val availableSlots = if (taskSet.isBarrier) {
> val availableResourcesAmount = availableResources.map { resourceMap =>
> // note that the addresses here have been expanded according to the
> numParts
> resourceMap.map { case (name, addresses) => (name, addresses.length) }
> }
> calculateAvailableSlots(this, availableCpus, availableResourcesAmount)
> } else {
> -1
> }
> // Skip the barrier taskSet if the available slots are less than the
> number of pending tasks.
> if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
> // Skip the launch process.
> // TODO SPARK-24819 If the job requires more slots than available (both
> busy and free
> // slots), fail the job on submit.
> logInfo(s"Skip current round of resource offers for barrier stage
> ${taskSet.stageId} " +
> s"because the barrier taskSet requires ${taskSet.numTasks} slots,
> while the total " +
> s"number of available slots is $availableSlots.")
> } else {
> var launchedAnyTask = false
> // Record all the executor IDs assigned barrier tasks on.
> val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
> for (currentMaxLocality <- taskSet.myLocalityLevels) {
> var launchedTaskAtCurrentMaxLocality = false
> do {
> launchedTaskAtCurrentMaxLocality =
> resourceOfferSingleTaskSet(taskSet,
> currentMaxLocality, shuffledOffers, availableCpus,
> availableResources, tasks, addressesWithDescs)
> launchedAnyTask |= launchedTaskAtCurrentMaxLocality
> } while (launchedTaskAtCurrentMaxLocality)
> }
> if (!launchedAnyTask) {
> taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach {
> taskIndex =>
> // If the taskSet is unschedulable we try to find an existing
> idle blacklisted
> // executor. If we cannot find one, we abort immediately. Else we
> kill the idle
> // executor and kick off an abortTimer which if it doesn't
> schedule a task within the
> // the timeout will abort the taskSet if we were unable to
> schedule any task from the
> // taskSet.
> // Note 1: We keep track of schedulability on a per taskSet basis
> rather than on a per
> // task basis.
> // Note 2: The taskSet can still be aborted when there are more
> than one idle
> // blacklisted executors and dynamic allocation is on. This can
> happen when a killed
> // idle executor isn't replaced in time by
> ExecutorAllocationManager as it relies on
> // pending tasks and doesn't kill executors on idle timeouts,
> resulting in the abort
> // timer to expire and abort the taskSet.
> executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match
> {
> case Some ((executorId, _)) =>
> if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
> blacklistTrackerOpt.foreach(blt =>
> blt.killBlacklistedIdleExecutor(executorId))
> val timeout =
> conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
> unschedulableTaskSetToExpiryTime(taskSet) =
> clock.getTimeMillis() + timeout
> logInfo(s"Waiting for $timeout ms for completely "
> + s"blacklisted task to be schedulable again before
> aborting $taskSet.")
> abortTimer.schedule(
> createUnschedulableTaskSetAbortTimer(taskSet, taskIndex),
> timeout)
> }
> case None => // Abort Immediately
> logInfo("Cannot schedule any task because of complete
> blacklisting. No idle" +
> s" executors can be found to kill. Aborting $taskSet." )
> taskSet.abortSinceCompletelyBlacklisted(taskIndex)
> }
> }
> } else {
> // We want to defer killing any taskSets as long as we have a non
> blacklisted executor
> // which can be used to schedule a task from any active taskSets.
> This ensures that the
> // job can make progress.
> // Note: It is theoretically possible that a taskSet never gets
> scheduled on a
> // non-blacklisted executor and the abort timer doesn't kick in
> because of a constant
> // submission of new TaskSets. See the PR for more details.
> if (unschedulableTaskSetToExpiryTime.nonEmpty) {
> logInfo("Clearing the expiry times for all unschedulable taskSets
> as a task was " +
> "recently scheduled.")
> unschedulableTaskSetToExpiryTime.clear()
> }
> }
> if (launchedAnyTask && taskSet.isBarrier) {
> // Check whether the barrier tasks are partially launched.
> // TODO SPARK-24818 handle the assert failure case (that can happen
> when some locality
> // requirements are not fulfilled, and we should revert the launched
> tasks).
> if (addressesWithDescs.size != taskSet.numTasks) {
> val errorMsg =
> s"Fail resource offers for barrier stage ${taskSet.stageId}
> because only " +
> s"${addressesWithDescs.size} out of a total number of
> ${taskSet.numTasks}" +
> s" tasks got resource offers. This happens because barrier
> execution currently " +
> s"does not work gracefully with delay scheduling. We highly
> recommend you to " +
> s"disable delay scheduling by setting spark.locality.wait=0 as
> a workaround if " +
> s"you see this error frequently."
> logWarning(errorMsg)
> taskSet.abort(errorMsg)
> throw new SparkException(errorMsg)
> }
> // materialize the barrier coordinator.
> maybeInitBarrierCoordinator()
> // Update the taskInfos into all the barrier task properties.
> val addressesStr = addressesWithDescs
> // Addresses ordered by partitionId
> .sortBy(_._2.partitionId)
> .map(_._1)
> .mkString(",")
> addressesWithDescs.foreach(_._2.properties.setProperty("addresses",
> addressesStr))
> logInfo(s"Successfully scheduled all the ${addressesWithDescs.size}
> tasks for barrier " +
> s"stage ${taskSet.stageId}.")
> }
> }
> }
> // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the
> barrier tasks don't get
> // launched within a configured time.
> if (tasks.nonEmpty) {
> hasLaunchedTask = true
> }
> return tasks
> }
> {code}
>
> *Explaination and Solution*
> The loop in {color:#00875a}TaskSchedulerImpl.resourceOffers{color} will
> iterate all none-zombie TaskSetManagers in a queue of Pool. Normally the size
> of this queue is not so big because it gets removed when all of its tasks is
> done. But for spark streaming jobs, we all konw receivers will be wrapped as
> a non-stop job ,which means its TaskSetManager will exists in the queue all
> the time until the application is finished. For example, when it start to
> launch the 10th receiver ,the size of the queue is 10 ,so it will iterates 10
> times and when it starts to launch the 500th receiver, it will iterate 500
> times . However 499 of the iteration are not necessay ,their task is already
> on running .
> When I digged deep into the code. I find that it decides whether a
> TaskSetManagers still has pending tasks left in
> {color:#00875a}TaskSetManagers .dequeueTaskFromList{color} which is far away
> form the loop in {color:#00875a}TaskSchedulerImpl.resourceOffers{color}.
> {code:java}
> private def dequeueTaskFromList(
> execId: String,
> host: String,
> list: ArrayBuffer[Int],
> speculative: Boolean = false): Option[Int] = {
> var indexOffset = list.size
> while (indexOffset > 0) {
> indexOffset -= 1
> val index = list(indexOffset)
> if (!isTaskBlacklistedOnExecOrNode(index, execId, host) &&
> !(speculative && hasAttemptOnHost(index, host))) {
> // This should almost always be list.trimEnd(1) to remove tail
> list.remove(indexOffset)
> // Speculatable task should only be launched when at most one copy of
> the
> // original task is running
> if (!successful(index)) {
> if (copiesRunning(index) == 0) {
> return Some(index)
> } else if (speculative && copiesRunning(index) == 1) {
> return Some(index)
> }
> }
> }
> }
> None
> }{code}
> So I move the pending tasks code ahead to the loop in
> {color:#00875a}TaskSchedulerImpl.resourceOffers{color}. ,and I also consided
> the speculation mode.This prevent TaskSetManagers of which all tasks are
> finished from getting into this loop,which saves a lot of unnecessay
> iterations.
> {code:java}
> def resourceOffers(offers: IndexedSeq[WorkerOffer]):
> Seq[Seq[TaskDescription]] = synchronized {
> // Mark each slave as alive and remember its hostname
> // Also track if new executor is added
> var newExecAvail = false
> for (o <- offers) {
> if (!hostToExecutors.contains(o.host)) {
> hostToExecutors(o.host) = new HashSet[String]()
> }
> if (!executorIdToRunningTaskIds.contains(o.executorId)) {
> hostToExecutors(o.host) += o.executorId
> executorAdded(o.executorId, o.host)
> executorIdToHost(o.executorId) = o.host
> executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
> newExecAvail = true
> }
> }
> val hosts = offers.map(_.host).distinct
> for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) {
> hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host
> }
> // Before making any offers, remove any nodes from the blacklist whose
> blacklist has expired. Do
> // this here to avoid a separate thread and added synchronization overhead,
> and also because
> // updating the blacklist is only relevant when task offers are being made.
> blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
> val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
> offers.filter { offer =>
> !blacklistTracker.isNodeBlacklisted(offer.host) &&
> !blacklistTracker.isExecutorBlacklisted(offer.executorId)
> }
> }.getOrElse(offers)
> val shuffledOffers = shuffleOffers(filteredOffers)
> // Build a list of tasks to assign to each worker.
> val tasks = shuffledOffers.map(o => new
> ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
> val availableResources = shuffledOffers.map(_.resources).toArray
> val availableCpus = shuffledOffers.map(o => o.cores).toArray
> val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
> for (taskSet <- sortedTaskSets) {
> logDebug("parentName: %s, name: %s, runningTasks: %s".format(
> taskSet.parent.name, taskSet.name, taskSet.runningTasks))
> if (newExecAvail) {
> taskSet.executorAdded()
> }
> }
> // Take each TaskSet in our scheduling order, and then offer it each node
> in increasing order
> // of locality levels so that it gets a chance to launch local tasks on all
> of them.
> // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF,
> RACK_LOCAL, ANY
> for (taskSet <- sortedTaskSets if hasPendingTasksToSched(taskSet)) {
> // we only need to calculate available slots if using barrier scheduling,
> otherwise the
> // value is -1
> val availableSlots = if (taskSet.isBarrier) {
> val availableResourcesAmount = availableResources.map { resourceMap =>
> // note that the addresses here have been expanded according to the
> numParts
> resourceMap.map { case (name, addresses) => (name, addresses.length) }
> }
> calculateAvailableSlots(this, availableCpus, availableResourcesAmount)
> } else {
> -1
> }
> // Skip the barrier taskSet if the available slots are less than the
> number of pending tasks.
> if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
> // Skip the launch process.
> // TODO SPARK-24819 If the job requires more slots than available (both
> busy and free
> // slots), fail the job on submit.
> logInfo(s"Skip current round of resource offers for barrier stage
> ${taskSet.stageId} " +
> s"because the barrier taskSet requires ${taskSet.numTasks} slots,
> while the total " +
> s"number of available slots is $availableSlots.")
> } else {
> var launchedAnyTask = false
> // Record all the executor IDs assigned barrier tasks on.
> val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
> for (currentMaxLocality <- taskSet.myLocalityLevels) {
> var launchedTaskAtCurrentMaxLocality = false
> do {
> launchedTaskAtCurrentMaxLocality =
> resourceOfferSingleTaskSet(taskSet,
> currentMaxLocality, shuffledOffers, availableCpus,
> availableResources, tasks, addressesWithDescs)
> launchedAnyTask |= launchedTaskAtCurrentMaxLocality
> } while (launchedTaskAtCurrentMaxLocality)
> }
> if (!launchedAnyTask) {
> taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach {
> taskIndex =>
> // If the taskSet is unschedulable we try to find an existing
> idle blacklisted
> // executor. If we cannot find one, we abort immediately. Else we
> kill the idle
> // executor and kick off an abortTimer which if it doesn't
> schedule a task within the
> // the timeout will abort the taskSet if we were unable to
> schedule any task from the
> // taskSet.
> // Note 1: We keep track of schedulability on a per taskSet basis
> rather than on a per
> // task basis.
> // Note 2: The taskSet can still be aborted when there are more
> than one idle
> // blacklisted executors and dynamic allocation is on. This can
> happen when a killed
> // idle executor isn't replaced in time by
> ExecutorAllocationManager as it relies on
> // pending tasks and doesn't kill executors on idle timeouts,
> resulting in the abort
> // timer to expire and abort the taskSet.
> executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match
> {
> case Some ((executorId, _)) =>
> if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
> blacklistTrackerOpt.foreach(blt =>
> blt.killBlacklistedIdleExecutor(executorId))
> val timeout =
> conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
> unschedulableTaskSetToExpiryTime(taskSet) =
> clock.getTimeMillis() + timeout
> logInfo(s"Waiting for $timeout ms for completely "
> + s"blacklisted task to be schedulable again before
> aborting $taskSet.")
> abortTimer.schedule(
> createUnschedulableTaskSetAbortTimer(taskSet, taskIndex),
> timeout)
> }
> case None => // Abort Immediately
> logInfo("Cannot schedule any task because of complete
> blacklisting. No idle" +
> s" executors can be found to kill. Aborting $taskSet." )
> taskSet.abortSinceCompletelyBlacklisted(taskIndex)
> }
> }
> } else {
> // We want to defer killing any taskSets as long as we have a non
> blacklisted executor
> // which can be used to schedule a task from any active taskSets.
> This ensures that the
> // job can make progress.
> // Note: It is theoretically possible that a taskSet never gets
> scheduled on a
> // non-blacklisted executor and the abort timer doesn't kick in
> because of a constant
> // submission of new TaskSets. See the PR for more details.
> if (unschedulableTaskSetToExpiryTime.nonEmpty) {
> logInfo("Clearing the expiry times for all unschedulable taskSets
> as a task was " +
> "recently scheduled.")
> unschedulableTaskSetToExpiryTime.clear()
> }
> }
> if (launchedAnyTask && taskSet.isBarrier) {
> // Check whether the barrier tasks are partially launched.
> // TODO SPARK-24818 handle the assert failure case (that can happen
> when some locality
> // requirements are not fulfilled, and we should revert the launched
> tasks).
> if (addressesWithDescs.size != taskSet.numTasks) {
> val errorMsg =
> s"Fail resource offers for barrier stage ${taskSet.stageId}
> because only " +
> s"${addressesWithDescs.size} out of a total number of
> ${taskSet.numTasks}" +
> s" tasks got resource offers. This happens because barrier
> execution currently " +
> s"does not work gracefully with delay scheduling. We highly
> recommend you to " +
> s"disable delay scheduling by setting spark.locality.wait=0 as
> a workaround if " +
> s"you see this error frequently."
> logWarning(errorMsg)
> taskSet.abort(errorMsg)
> throw new SparkException(errorMsg)
> }
> // materialize the barrier coordinator.
> maybeInitBarrierCoordinator()
> // Update the taskInfos into all the barrier task properties.
> val addressesStr = addressesWithDescs
> // Addresses ordered by partitionId
> .sortBy(_._2.partitionId)
> .map(_._1)
> .mkString(",")
> addressesWithDescs.foreach(_._2.properties.setProperty("addresses",
> addressesStr))
> logInfo(s"Successfully scheduled all the ${addressesWithDescs.size}
> tasks for barrier " +
> s"stage ${taskSet.stageId}.")
> }
> }
> }
> // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the
> barrier tasks don't get
> // launched within a configured time.
> if (tasks.nonEmpty) {
> hasLaunchedTask = true
> }
> return tasks
> }
> def hasPendingTasksToSched(taskSet: TaskSetManager): Boolean = {
> taskSet.tasks.zipWithIndex
> .exists(t =>
> (!taskSet.successful(t._2) && taskSet.copiesRunning(t._2) == 0)
> || taskSet.speculatableTasks.contains(t._2))
> }
> {code}
> *conclusion*
> We managed to reduce the launch time of all receivers to around 50s stablely
> (500 receivers).I think the spark contributors haven't thought a scenario
> where a lot of job are running at the same time which I know is unusual but
> still a good complement。
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]