[ https://issues.apache.org/jira/browse/SPARK-52090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17953168#comment-17953168 ]
Enrico Minack commented on SPARK-52090: --------------------------------------- I have experimented with executors updating their block address information: [https://github.com/G-Research/spark/commit/8bab3e940e5a7b2b706014c6e47daa1ec1e19296] Works great, I am just worried this creates a lot of network preasure on the driver on massive scale down events / executor failures as all affected executors update their shuffle block addresses. Further, the current implementation fetches map output infos for the whole shuffle for each failed shuffle block. Some per-shuffle TTL would need to impove this. A deferred exit of a migrated executor only reduces the likelihood of an ExecutorDeadException, but there is no good value that guarantees no fetch failures / stage retries. > FetchFailedException causing stage failures when gracefully decommissioning > executors > ------------------------------------------------------------------------------------- > > Key: SPARK-52090 > URL: https://issues.apache.org/jira/browse/SPARK-52090 > Project: Spark > Issue Type: Bug > Components: Kubernetes > Affects Versions: 3.5.5 > Reporter: Sam Wheating > Priority: Major > > We are running Spark 3.5.5 on Kubernetes, with the following configuration: > |spark.decommission.enabled|true| > |spark.storage.decommission.enabled|true| > As well as a 600s terminationGracePeriod on the executor pods themselves. > When an executor is removed (for example, using `kubectl delete pod`), I can > see the graceful decommissioning working as expected. The pod stays in a > terminating state for a few minutes while it finishes its running tasks and > migrates shuffle data to other executors. See relevant logs below: > > {code:java} > 25/05/12 18:58:43 INFO BlockManager: Starting block manager decommissioning > process... > 25/05/12 18:58:43 INFO CoarseGrainedExecutorBackend: Will exit when finished > decommissioning > 25/05/12 18:58:43 INFO BlockManagerDecommissioner: Starting block migration > 25/05/12 18:58:43 INFO BlockManagerDecommissioner: Attempting to migrate all > RDD blocks > 25/05/12 18:58:43 INFO BlockManagerDecommissioner: Attempting to migrate all > shuffle blocks > 25/05/12 18:58:43 INFO BlockManagerDecommissioner: Start refreshing > migratable shuffle blocks > ... > 25/05/12 19:00:42 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_18_15196 (size: 356.4 MiB) to BlockManagerId(26, > 10.101.0.143, 36127, None) in 3750 ms > ... > 25/05/12 19:01:13 INFO BlockManagerDecommissioner: Finished current round > refreshing migratable shuffle blocks, waiting for 30000ms before the next > round refreshing. > 25/05/12 19:01:14 INFO CoarseGrainedExecutorBackend: Checking to see if we > can shutdown. > 25/05/12 19:01:14 INFO CoarseGrainedExecutorBackend: No running tasks, > checking migrations > 25/05/12 19:01:14 INFO CoarseGrainedExecutorBackend: No running tasks, all > blocks migrated, stopping. > 25/05/12 19:01:14 INFO CoarseGrainedExecutorBackend: Executor self-exiting > due to : Finished decommissioning > 25/05/12 19:01:14 INFO CoarseGrainedExecutorBackend: Driver commanded a > shutdown{code} > However, as soon as the executor finishes decommissioning we tasks fail due > to an ExecutorDeadException: > {code:java} > 25/05/12 19:01:17 WARN TaskSetManager: Lost task 6.0 in stage 63.0 (TID > 16410) (10.97.126.203 executor 14): FetchFailed(BlockManagerId(25, > 10.101.0.142, 36601, None), shuffleId=18, mapIndex=3129, mapId=14579, > reduceId=6, message=2025-05-12 19:01:18.896 at > org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:437) > > ... > Caused by: org.apache.spark.ExecutorDeadException: [INTERNAL_ERROR_NETWORK] > The relative remote executor(Id: 25), which maintains the block data to fetch > is dead.{code} > And then the driver logs the stage failure: > {code:java} > 25/05/12 19:01:17 INFO DAGScheduler: ResultStage 63 (parquet at > SparkJob.scala:499) failed in 94.460 s due to > org.apache.spark.shuffle.FetchFailedException {code} > Based on the documentation, the loss of an executor should be recoverable > without restarting the whole stage, assuming storage decommissioning is > enabled, but it seems like this isn't the case in practice. > When running larger sparkApplications on kubernetes, we often see multiple > executor evictions within a single execution (as we are aggresively scaling > our clusters), so having multiple stage restarts is significantly slowing > down our jobs. > Please let me know if we're missing any other configuration, or if there's > any other information I can provide. Thanks! > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org