[
https://issues.apache.org/jira/browse/SPARK-41192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mridul Muralidharan resolved SPARK-41192.
-----------------------------------------
Fix Version/s: 3.4.0
Resolution: Fixed
Issue resolved by pull request 38711
[https://github.com/apache/spark/pull/38711]
> Task finished before speculative task scheduled leads to holding idle
> executors
> -------------------------------------------------------------------------------
>
> Key: SPARK-41192
> URL: https://issues.apache.org/jira/browse/SPARK-41192
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.2.2, 3.3.1
> Reporter: Yazhi Wang
> Assignee: Yazhi Wang
> Priority: Minor
> Labels: dynamic_allocation
> Fix For: 3.4.0
>
> Attachments: dynamic-executors, dynamic-log
>
>
> When task finished before speculative task has been scheduled by
> DAGScheduler, then the speculative tasks will be considered as pending and
> count towards the calculation of number of needed executors, which will lead
> to request more executors than needed
> h2. Background & Reproduce
> In one of our production job, we found that ExecutorAllocationManager was
> holding more executors than needed.
> We found it's difficult to reproduce in the test environment. In order to
> stably reproduce and debug, we temporarily annotated the scheduling code of
> speculative tasks in TaskSetManager:363 to ensure that the task be completed
> before the speculative task being scheduled.
> {code:java}
> // Original code
> private def dequeueTask(
> execId: String,
> host: String,
> maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value,
> Boolean)] = {
> // Tries to schedule a regular task first; if it returns None, then
> schedules
> // a speculative task
> dequeueTaskHelper(execId, host, maxLocality, false).orElse(
> dequeueTaskHelper(execId, host, maxLocality, true))
> }
> // Speculative task will never be scheduled
> private def dequeueTask(
> execId: String,
> host: String,
> maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value,
> Boolean)] = {
> // Tries to schedule a regular task first; if it returns None, then
> schedules
> // a speculative task
> dequeueTaskHelper(execId, host, maxLocality, false)
> } {code}
> Referring to examples in SPARK-30511
> You will see when running the last task, we would be hold 38 executors (see
> attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only
> 2 tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed.
> {code:java}
> ./bin/spark-shell --master yarn --conf spark.speculation=true --conf
> spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf
> spark.dynamicAllocation.minExecutors=20 --conf
> spark.dynamicAllocation.maxExecutors=1000 {code}
> {code:java}
> val n = 4000
> val someRDD = sc.parallelize(1 to n, n)
> someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
> if (index > 3998) {
> Thread.sleep(1000 * 1000)
> } else if (index > 3850) {
> Thread.sleep(50 * 1000) // Fake running tasks
> } else {
> Thread.sleep(100)
> }
> Array.fill[Int](1)(1).iterator{code}
>
> I will have a PR ready to fix this issue
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]