[ 
https://issues.apache.org/jira/browse/SPARK-41953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656480#comment-17656480
 ] 

Mridul Muralidharan edited comment on SPARK-41953 at 1/10/23 7:48 AM:
----------------------------------------------------------------------

A few things:

* Looking at SPARK-27637, we should revisit it - the {{IsExecutorAlive}} 
request does not make sense in case of dynamic resource allocation (DRA) when 
an external shuffle service (ESS) is enabled : we should not be making that 
call. Thoughts [~Ngone51] ? +CC [~csingh]
This also means, relying on ExecutorDeadException when DRA is enabled with ESS 
is configured wont be useful.

For rest of the proposal ...

For (2),  I am not sure about 'Make MapOutputTracker support fetch latest 
output without epoch provided.' - this could have nontrivial interaction with 
other things, and I will need to think through it. Not sure if we can model 
node decommission - where we have block moved from host A to host B without any 
other change - as not requiring an epoch update (or rather, flag the epoch's as 
'compatible' - if there are no interleaving updates), requires analysis ....

Assuming we sort out how to get updated state, (3) looks like a reasonable 
approach.





was (Author: mridulm80):

A few things:

* Looking at SPARK-27637, we should revisit it - the {{IsExecutorAlive}} 
request does not make sense in case of dynamic resource allocation (DRA) when 
an external shuffle service (ESS) is enabled : we should not be making that 
call. Thoughts [~Ngone51] ? +CC [~csingh]
This also means, relying on ExecutorDeadException when DRA is enabled with ESS 
is configured wont be useful.

For rest of the proposal ...

For (2),  I am not sure about 'Make MapOutputTracker support fetch latest 
output without epoch provided.' - this could have nontrivial interaction with 
other things, and I will need to think through it. Not sure if we can model 
node decommission - where we have block moved from host A to host B without any 
other change - as not requiring an epoch update, requires analysis ....

Assuming we sort out how to get updated state, (3) looks like a reasonable 
approach.




> Shuffle output location refetch during shuffle migration in decommission
> ------------------------------------------------------------------------
>
>                 Key: SPARK-41953
>                 URL: https://issues.apache.org/jira/browse/SPARK-41953
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.3.1
>            Reporter: Zhongwei Zhu
>            Priority: Major
>
> When shuffle migration enabled during spark decommissionm, shuffle data will 
> be migrated into live executors, then update latest location to 
> MapOutputTracker. It has some issues:
>  # Executors only do map output location fetch in the beginning of the reduce 
> stage, so any shuffle output location change in the middle of reduce will 
> cause FetchFailed as reducer fetch from old location. Even stage retries 
> could solve this, this still cause lots of resource waste as all shuffle read 
> and compute happened before FetchFailed partition will be wasted.
>  # During stage retries, less running tasks cause more executors to be 
> decommissioned and shuffle data location keep changing. In the worst case, 
> stage could need lots of retries, further breaking SLA.
> So I propose to support refetch map output location during reduce phase if 
> shuffle migration is enabled and FetchFailed is caused by a decommissioned 
> dead executor. The detailed steps as below:
>  # When `BlockTransferService` fetch blocks failed from a decommissioned dead 
> executor, ExecutorDeadException(isDecommission as true) will be thrown.
>  # Make MapOutputTracker support fetch latest output without epoch provided.
>  # `ShuffleBlockFetcherIterator` will refetch latest output from 
> MapOutputTrackMaster. For all the shuffle blocks on this decommissioned, 
> there should be a new location on another executor. If not, throw exception 
> as current. If yes, create new local and remote requests to fetch these 
> migrated shuffle blocks. The flow will be similar as failback fetch when push 
> merged fetch failed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to