[ 
https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-21656:
----------------------------------
    Description: 
Right now with dynamic allocation spark starts by getting the number of 
executors it needs to run all the tasks in parallel (or the configured maximum) 
for that stage.  After it gets that number it will never reacquire more unless 
either an executor dies, is explicitly killed by yarn or it goes to the next 
stage.  The dynamic allocation manager has the concept of idle timeout. 
Currently this says if a task hasn't been scheduled on that executor for a 
configurable amount of time (60 seconds by default), then let that executor go. 
 Note when it lets that executor go due to the idle timeout it never goes back 
to see if it should reacquire more.

This is a problem for multiple reasons:
1 . Things can happen in the system that are not expected that can cause 
delays. Spark should be resilient to these. If the driver is GC'ing, you have 
network delays, etc we could idle timeout executors even though there are tasks 
to run on them its just the scheduler hasn't had time to start those tasks.  
Note that in the worst case this allows the number of executors to go to 0 and 
we have a deadlock.

2. Internal Spark components have opposing requirements. The scheduler has a 
requirement to try to get locality, the dynamic allocation doesn't know about 
this and if it lets the executors go it hurts the scheduler from doing what it 
was designed to do.  For example the scheduler first tries to schedule node 
local, during this time it can skip scheduling on some executors.  After a 
while though the scheduler falls back from node local to scheduler on rack 
local, and then eventually on any node.  So during when the scheduler is doing 
node local scheduling, the other executors can idle timeout.  This means that 
when the scheduler does fall back to rack or any locality where it would have 
used those executors, we have already let them go and it can't scheduler all 
the tasks it could which can have a huge negative impact on job run time.
 
In both of these cases when the executors idle timeout we never go back to 
check to see if we need more executors (until the next stage starts).  In the 
worst case you end up with 0 and deadlock, but generally this shows itself by 
just going down to very few executors when you could have 10's of thousands of 
tasks to run on them, which causes the job to take way more time (in my case 
I've seen it should take minutes and it takes hours due to only been left a few 
executors).  

We should handle these situations in Spark.   The most straight forward 
approach would be to not allow the executors to idle timeout when there are 
tasks that could run on those executors. This would allow the scheduler to do 
its job with locality scheduling.  In doing this it also fixes number 1 above 
because you never can go into a deadlock as it will keep enough executors to 
run all the tasks on. 

There are other approaches to fix this, like explicitly prevent it from going 
to 0 executors, that prevents a deadlock but can still cause the job to 
slowdown greatly.  We could also change it at some point to just re-check to 
see if we should get more executors, but this adds extra logic, we would have 
to decide when to check, its also just overhead in letting them go and then 
re-acquiring them again and this would cause some slowdown in the job as the 
executors aren't immediately there for the scheduler to place things on. 

  was:
Right now spark lets go of executors when they are idle for the 60s (or 
configurable time). I have seen spark let them go when they are idle but they 
were really needed. I have seen this issue when the scheduler was waiting to 
get node locality but that takes longer then the default idle timeout. In these 
jobs the number of executors goes down really small (less than 10) but there 
are still like 80,000 tasks to run.
We should consider not allowing executors to idle timeout if they are still 
needed according to the number of tasks to be run.

There are multiple reasons:
1 . Things can happen in the system that are not expected that can cause 
delays. Spark should be resilient to these. If the driver is GC'ing, you have 
network delays, etc we could idle timeout executors even though there are tasks 
to run on them its just the scheduler hasn't had time to start those tasks. 
these just slow down the users job, the user does not want this.

2. Internal Spark components have opposing requirements. The scheduler has a 
requirement to try to get locality, the dynamic allocation doesn't know about 
this and it giving away executors it hurting the scheduler from doing what it 
was designed to do.
Ideally we have enough executors to run all the tasks on. If dynamic allocation 
allows those to idle timeout the scheduler can not make proper decisions. In 
the end this hurts users by affects the job. A user should not have to mess 
with the configs to keep this basic behavior.


> spark dynamic allocation should not idle timeout executors when there are 
> enough tasks to run on them
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21656
>                 URL: https://issues.apache.org/jira/browse/SPARK-21656
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.1
>            Reporter: Jong Yoon Lee
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Right now with dynamic allocation spark starts by getting the number of 
> executors it needs to run all the tasks in parallel (or the configured 
> maximum) for that stage.  After it gets that number it will never reacquire 
> more unless either an executor dies, is explicitly killed by yarn or it goes 
> to the next stage.  The dynamic allocation manager has the concept of idle 
> timeout. Currently this says if a task hasn't been scheduled on that executor 
> for a configurable amount of time (60 seconds by default), then let that 
> executor go.  Note when it lets that executor go due to the idle timeout it 
> never goes back to see if it should reacquire more.
> This is a problem for multiple reasons:
> 1 . Things can happen in the system that are not expected that can cause 
> delays. Spark should be resilient to these. If the driver is GC'ing, you have 
> network delays, etc we could idle timeout executors even though there are 
> tasks to run on them its just the scheduler hasn't had time to start those 
> tasks.  Note that in the worst case this allows the number of executors to go 
> to 0 and we have a deadlock.
> 2. Internal Spark components have opposing requirements. The scheduler has a 
> requirement to try to get locality, the dynamic allocation doesn't know about 
> this and if it lets the executors go it hurts the scheduler from doing what 
> it was designed to do.  For example the scheduler first tries to schedule 
> node local, during this time it can skip scheduling on some executors.  After 
> a while though the scheduler falls back from node local to scheduler on rack 
> local, and then eventually on any node.  So during when the scheduler is 
> doing node local scheduling, the other executors can idle timeout.  This 
> means that when the scheduler does fall back to rack or any locality where it 
> would have used those executors, we have already let them go and it can't 
> scheduler all the tasks it could which can have a huge negative impact on job 
> run time.
>  
> In both of these cases when the executors idle timeout we never go back to 
> check to see if we need more executors (until the next stage starts).  In the 
> worst case you end up with 0 and deadlock, but generally this shows itself by 
> just going down to very few executors when you could have 10's of thousands 
> of tasks to run on them, which causes the job to take way more time (in my 
> case I've seen it should take minutes and it takes hours due to only been 
> left a few executors).  
> We should handle these situations in Spark.   The most straight forward 
> approach would be to not allow the executors to idle timeout when there are 
> tasks that could run on those executors. This would allow the scheduler to do 
> its job with locality scheduling.  In doing this it also fixes number 1 above 
> because you never can go into a deadlock as it will keep enough executors to 
> run all the tasks on. 
> There are other approaches to fix this, like explicitly prevent it from going 
> to 0 executors, that prevents a deadlock but can still cause the job to 
> slowdown greatly.  We could also change it at some point to just re-check to 
> see if we should get more executors, but this adds extra logic, we would have 
> to decide when to check, its also just overhead in letting them go and then 
> re-acquiring them again and this would cause some slowdown in the job as the 
> executors aren't immediately there for the scheduler to place things on. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to