[
https://issues.apache.org/jira/browse/SPARK-19326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tejas Patil updated SPARK-19326:
--------------------------------
Description:
Speculated copies of tasks do not get launched in some cases.
Examples:
- All the running executors have no CPU slots left to accommodate a speculated
copy of the task(s). If the all running executors reside over a set of slow /
bad hosts, they will keep the job running for long time
- `spark.task.cpus` > 1 and the running executor has not filled up all its CPU
slots. Since the [speculated copies of tasks should run on different
host|https://github.com/apache/spark/blob/2e139eed3194c7b8814ff6cf007d4e8a874c1e4d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L283]
and not the host where the first copy was launched.
In both these cases, `ExecutorAllocationManager` does not know about pending
speculation task attempts and thinks that all the resource demands are well
taken care of. ([relevant
code|https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L265])
This adds variation in the job completion times and more importantly SLA misses
:( In prod, with a large number of jobs, I see this happening more often than
one would think. Chasing the bad hosts or reason for slowness doesn't scale.
Here is a tiny repro. Note that you need to launch this with (Mesos or YARN or
standalone deploy mode) along with `--conf spark.speculation=true --conf
spark.executor.cores=4 --conf spark.dynamicAllocation.maxExecutors=100`
{code}
val someRDD = sc.parallelize(1 to 8, 8)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 7) {
Thread.sleep(Long.MaxValue) // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
{code}
was:
Speculated copies of tasks do not get launched in some cases.
Examples:
- All the running executors have no CPU slots left to accommodate a speculated
copy of the task(s). If the all running executors reside over a set of slow /
bad hosts, they will keep the job running for long time
- `spark.task.cpus` > 1 and the running executor has not filled up all its CPU
slots. Since the [speculated copies of tasks should run on different
host|https://github.com/apache/spark/blob/2e139eed3194c7b8814ff6cf007d4e8a874c1e4d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L283]
and not the host where the first copy was launched.
In both these cases, `ExecutorAllocationManager` does not know about pending
speculation task attempts and thinks that all the resource demands are well
taken care of. ([relevant
code|https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L265])
This adds variation in the job completion times and more importantly SLA misses
:( In prod, with a large number of jobs, I see this happening more often than
one would think. Chasing the bad hosts or reason for slowness doesn't scale.
Here is a tiny repro. Note that you need to launch this with (Mesos or YARN or
standalone deploy mode) along with `spark.speculation=true`
{code}
val someRDD = sc.parallelize(1 to 8, 8)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 7) {
Thread.sleep(Long.MaxValue) // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
{code}
> Speculated task attempts do not get launched in few scenarios
> -------------------------------------------------------------
>
> Key: SPARK-19326
> URL: https://issues.apache.org/jira/browse/SPARK-19326
> Project: Spark
> Issue Type: Bug
> Components: Scheduler
> Affects Versions: 2.0.2, 2.1.0
> Reporter: Tejas Patil
>
> Speculated copies of tasks do not get launched in some cases.
> Examples:
> - All the running executors have no CPU slots left to accommodate a
> speculated copy of the task(s). If the all running executors reside over a
> set of slow / bad hosts, they will keep the job running for long time
> - `spark.task.cpus` > 1 and the running executor has not filled up all its
> CPU slots. Since the [speculated copies of tasks should run on different
> host|https://github.com/apache/spark/blob/2e139eed3194c7b8814ff6cf007d4e8a874c1e4d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L283]
> and not the host where the first copy was launched.
> In both these cases, `ExecutorAllocationManager` does not know about pending
> speculation task attempts and thinks that all the resource demands are well
> taken care of. ([relevant
> code|https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L265])
> This adds variation in the job completion times and more importantly SLA
> misses :( In prod, with a large number of jobs, I see this happening more
> often than one would think. Chasing the bad hosts or reason for slowness
> doesn't scale.
> Here is a tiny repro. Note that you need to launch this with (Mesos or YARN
> or standalone deploy mode) along with `--conf spark.speculation=true --conf
> spark.executor.cores=4 --conf spark.dynamicAllocation.maxExecutors=100`
> {code}
> val someRDD = sc.parallelize(1 to 8, 8)
> someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
> if (index == 7) {
> Thread.sleep(Long.MaxValue) // fake long running task(s)
> }
> it.toList.map(x => index + ", " + x).iterator
> }).collect
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]