[ 
https://issues.apache.org/jira/browse/SPARK-52090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17953168#comment-17953168
 ] 

Enrico Minack commented on SPARK-52090:
---------------------------------------

I have experimented with executors updating their block address information: 
[https://github.com/G-Research/spark/commit/8bab3e940e5a7b2b706014c6e47daa1ec1e19296]

Works great, I am just worried this creates a lot of network preasure on the 
driver on massive scale down events / executor failures as all affected 
executors update their shuffle block addresses. Further, the current 
implementation fetches map output infos for the whole shuffle for each failed 
shuffle block. Some per-shuffle TTL would need to impove this.

A deferred exit of a migrated executor only reduces the likelihood of an 
ExecutorDeadException, but there is no good value that guarantees no fetch 
failures / stage retries.

> FetchFailedException causing stage failures when gracefully decommissioning 
> executors
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-52090
>                 URL: https://issues.apache.org/jira/browse/SPARK-52090
>             Project: Spark
>          Issue Type: Bug
>          Components: Kubernetes
>    Affects Versions: 3.5.5
>            Reporter: Sam Wheating
>            Priority: Major
>
> We are running Spark 3.5.5 on Kubernetes, with the following configuration:
> |spark.decommission.enabled|true|
> |spark.storage.decommission.enabled|true|
> As well as a 600s terminationGracePeriod on the executor pods themselves.
> When an executor is removed (for example, using `kubectl delete pod`), I can 
> see the graceful decommissioning working as expected. The pod stays in a 
> terminating state for a few minutes while it finishes its running tasks and 
> migrates shuffle data to other executors. See relevant logs below:
>  
> {code:java}
> 25/05/12 18:58:43 INFO BlockManager: Starting block manager decommissioning 
> process...
> 25/05/12 18:58:43 INFO CoarseGrainedExecutorBackend: Will exit when finished 
> decommissioning
> 25/05/12 18:58:43 INFO BlockManagerDecommissioner: Starting block migration
> 25/05/12 18:58:43 INFO BlockManagerDecommissioner: Attempting to migrate all 
> RDD blocks
> 25/05/12 18:58:43 INFO BlockManagerDecommissioner: Attempting to migrate all 
> shuffle blocks
> 25/05/12 18:58:43 INFO BlockManagerDecommissioner: Start refreshing 
> migratable shuffle blocks
> ...
> 25/05/12 19:00:42 INFO BlockManagerDecommissioner: Migrated 
> migrate_shuffle_18_15196 (size: 356.4 MiB) to BlockManagerId(26, 
> 10.101.0.143, 36127, None) in 3750 ms
> ...
> 25/05/12 19:01:13 INFO BlockManagerDecommissioner: Finished current round 
> refreshing migratable shuffle blocks, waiting for 30000ms before the next 
> round refreshing.
> 25/05/12 19:01:14 INFO CoarseGrainedExecutorBackend: Checking to see if we 
> can shutdown.
> 25/05/12 19:01:14 INFO CoarseGrainedExecutorBackend: No running tasks, 
> checking migrations
> 25/05/12 19:01:14 INFO CoarseGrainedExecutorBackend: No running tasks, all 
> blocks migrated, stopping.
> 25/05/12 19:01:14 INFO CoarseGrainedExecutorBackend: Executor self-exiting 
> due to : Finished decommissioning
> 25/05/12 19:01:14 INFO CoarseGrainedExecutorBackend: Driver commanded a 
> shutdown{code}
> However, as soon as the executor finishes decommissioning we tasks fail due 
> to an ExecutorDeadException:
> {code:java}
> 25/05/12 19:01:17 WARN TaskSetManager: Lost task 6.0 in stage 63.0 (TID 
> 16410) (10.97.126.203 executor 14): FetchFailed(BlockManagerId(25, 
> 10.101.0.142, 36601, None), shuffleId=18, mapIndex=3129, mapId=14579, 
> reduceId=6, message=2025-05-12 19:01:18.896  at 
> org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:437)
>  
> ...
> Caused by: org.apache.spark.ExecutorDeadException: [INTERNAL_ERROR_NETWORK] 
> The relative remote executor(Id: 25), which maintains the block data to fetch 
> is dead.{code}
> And then the driver logs the stage failure:
> {code:java}
> 25/05/12 19:01:17 INFO DAGScheduler: ResultStage 63 (parquet at 
> SparkJob.scala:499) failed in 94.460 s due to 
> org.apache.spark.shuffle.FetchFailedException {code}
> Based on the documentation, the loss of an executor should be recoverable 
> without restarting the whole stage, assuming storage decommissioning is 
> enabled, but it seems like this isn't the case in practice.
> When running larger sparkApplications on kubernetes, we often see multiple 
> executor evictions within a single execution (as we are aggresively scaling 
> our clusters), so having multiple stage restarts is significantly slowing 
> down our jobs.
> Please let me know if we're missing any other configuration, or if there's 
> any other information I can provide. Thanks!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to