[ 
https://issues.apache.org/jira/browse/SPARK-22148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17077674#comment-17077674
 ] 

Venkata krishnan Sowrirajan edited comment on SPARK-22148 at 4/8/20, 12:04 AM:
-------------------------------------------------------------------------------

Thanks for responding [~tgraves]. Thats right. 

Lets say all the executors are busy with some task and one of the task fails, 
then we are aborting the stage as there is no idle blacklisted executor 
available to kill and replace. But with dynamic allocation enabled, we could 
have requested for more executors and retried the task.

Infact, I can reproduce this with min executors set to 1 and max to some 
number. In this case, it wouldn't scale up immediately and the first task fails 
the whole stage because the only executor available is blacklisted for the task 
and also busy running other task at that time.

// Though this example would fail as casting an int to string is not valid. 
Just for example purposes.

{code:java}
def test(a: Int) = { a.asInstanceOf[String] }
sc.parallelize(1 to 10, 10).map(x => test(x)).collect 
{code}


Although if there are more executors, then its retried. Similar other cases are 
possible 


was (Author: vsowrirajan):
Thanks for responding [~tgraves]. Thats right. 

Lets say all the executors are busy with some task and one of the task fails, 
then we are aborting the stage as there is no idle blacklisted executor 
available to kill and replace. But with dynamic allocation enabled, we could 
have requested for more executors and retried the task.

Infact, I can reproduce this with min executors set to 1 and max to some 
number. In this case, it wouldn't scale up immediately and the first task fails 
the whole stage because the only executor available is blacklisted for the task 
and also busy running other task at that time.

// Though this example would fail as casting an int to string is not valid. 
Just for example purposes.

{code:java}
def test(a: Int) = { a.asInstanceOf[String] }
sc.parallelize(1 to 10, 10).map(x => test(x)).collect 
{code}


Although if there are more executors, then its possibly retried. Similar other 
cases are possible 

> TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current 
> executors are blacklisted but dynamic allocation is enabled
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22148
>                 URL: https://issues.apache.org/jira/browse/SPARK-22148
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler, Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Juan Rodríguez Hortalá
>            Assignee: Dhruve Ashar
>            Priority: Major
>             Fix For: 2.4.1, 3.0.0
>
>         Attachments: SPARK-22148_WIP.diff
>
>
> Currently TaskSetManager.abortIfCompletelyBlacklisted aborts the TaskSet and 
> the whole Spark job with `task X (partition Y) cannot run anywhere due to 
> node and executor blacklist. Blacklisting behavior can be configured via 
> spark.blacklist.*.` when all the available executors are blacklisted for a 
> pending Task or TaskSet. This makes sense for static allocation, where the 
> set of executors is fixed for the duration of the application, but this might 
> lead to unnecessary job failures when dynamic allocation is enabled. For 
> example, in a Spark application with a single job at a time, when a node 
> fails at the end of a stage attempt, all other executors will complete their 
> tasks, but the tasks running in the executors of the failing node will be 
> pending. Spark will keep waiting for those tasks for 2 minutes by default 
> (spark.network.timeout) until the heartbeat timeout is triggered, and then it 
> will blacklist those executors for that stage. At that point in time, other 
> executors would had been released after being idle for 1 minute by default 
> (spark.dynamicAllocation.executorIdleTimeout), because the next stage hasn't 
> started yet and so there are no more tasks available (assuming the default of 
> spark.speculation = false). So Spark will fail because the only executors 
> available are blacklisted for that stage. 
> An alternative is requesting more executors to the cluster manager in this 
> situation. This could be retried a configurable number of times after a 
> configurable wait time between request attempts, so if the cluster manager 
> fails to provide a suitable executor then the job is aborted like in the 
> previous case. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to