[
https://issues.apache.org/jira/browse/SPARK-22339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apache Spark reassigned SPARK-22339:
------------------------------------
Assignee: (was: Apache Spark)
> Push epoch updates to executors on fetch failure to avoid fetch retries for
> missing executors
> ---------------------------------------------------------------------------------------------
>
> Key: SPARK-22339
> URL: https://issues.apache.org/jira/browse/SPARK-22339
> Project: Spark
> Issue Type: Improvement
> Components: Shuffle, Spark Core
> Affects Versions: 2.2.0
> Reporter: Juan RodrĂguez Hortalá
> Attachments: push_epoch_update-WIP.diff
>
>
> When a task finishes with error due to a fetch error, then DAGScheduler
> unregisters the shuffle blocks hosted by the serving executor (or even all
> the executors in the failing host, with external shuffle and
> spark.files.fetchFailure.unRegisterOutputOnHost enabled) in the shuffle block
> directory stored by MapOutputTracker, that then increments its epoch as a
> result. This event is only signaled to the other executors when a new task
> with a new epoch starts in each executor. This means that other executors
> reading from the failed executors will retry fetching shuffle blocks from
> them, even though the driver already knows those executors are lost and those
> blocks are now unavailable at those locations. This impacts job runtime,
> specially for long shuffles and executor failures at the end of a stage, when
> the only pending tasks are shuffle reads.
> This could be improved by pushing the epoch update to the executors without
> having to wait for a new task. In the attached patch I sketch a possible
> solution that sends the updated epoch from the driver to the executors by
> piggybacking on the executor heartbeat response. ShuffleBlockFetcherIterator,
> RetryingBlockFetcher and BlockFetchingListener are modified so blocks
> locations are checked on each fetch retry. This doesn't introduce additional
> traffic, as MapOutputTrackerWorker.mapStatuses is shared by all tasks running
> on the same Executor, and the lookup of the new shuffle blocks directory was
> going to happen anyway when the new epoch is detected during the start of the
> next task.
> I would like to know the opinion of the community on this approach.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]