[
https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401282#comment-17401282
]
Jason Xu commented on SPARK-30873:
----------------------------------
Hi Saurabh, I found your work is useful in dealing with Yarn node
decommissioning. I have one questions: In order to get "DECOMMISSIONING" node
state update from RM, does it require using YARN / Hadoop later than version
3.0.1? Seems support is added inĀ
https://issues.apache.org/jira/browse/YARN-6483.
> Handling Node Decommissioning for Yarn cluster manger in Spark
> --------------------------------------------------------------
>
> Key: SPARK-30873
> URL: https://issues.apache.org/jira/browse/SPARK-30873
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core, YARN
> Affects Versions: 3.1.0
> Reporter: Saurabh Chawla
> Priority: Major
>
> In many public cloud environments, the node loss (in case of AWS
> SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed
> activity.
> The cloud provider intimates the cluster manager about the possible loss of
> node ahead of time. Few examples is listed here:
> a) Spot loss in AWS(2 min before event)
> b) GCP Pre-emptible VM loss (30 second before event)
> c) AWS Spot block loss with info on termination time (generally few tens of
> minutes before decommission as configured in Yarn)
> This JIRA tries to make spark leverage the knowledge of the node loss in
> future, and tries to adjust the scheduling of tasks to minimise the impact on
> the application.
> It is well known that when a host is lost, the executors, its running tasks,
> their caches and also Shuffle data is lost. This could result in wastage of
> compute and other resources.
> The focus here is to build a framework for YARN, that can be extended for
> other cluster managers to handle such scenario.
> The framework must handle one or more of the following:-
> 1) Prevent new tasks from starting on any executors on decommissioning Nodes.
> 2) Decide to kill the running tasks so that they can be restarted elsewhere
> (assuming they will not complete within the deadline) OR we can allow them to
> continue hoping they will finish within deadline.
> 3) Clear the shuffle data entry from MapOutputTracker of decommission node
> hostname to prevent the shuffle fetchfailed exception.The most significant
> advantage of unregistering shuffle outputs when Spark schedules the first
> re-attempt to compute the missing blocks, it notices all of the missing
> blocks from decommissioned nodes and recovers in only one attempt. This
> speeds up the recovery process significantly over the scheduled Spark
> implementation, where stages might be rescheduled multiple times to recompute
> missing shuffles from all nodes, and prevent jobs from being stuck for hours
> failing and recomputing.
> 4) Prevent the stage to abort due to the fetchfailed exception in case of
> decommissioning of node. In Spark there is number of consecutive stage
> attempts allowed before a stage is aborted.This is controlled by the config
> spark.stage.maxConsecutiveAttempts. Not accounting fetch fails due
> decommissioning of nodes towards stage failure improves the reliability of
> the system.
> Main components of change
> 1) Get the ClusterInfo update from the Resource Manager -> Application Master
> -> Spark Driver.
> 2) DecommissionTracker, resides inside driver, tracks all the decommissioned
> nodes and take necessary action and state transition.
> 3) Based on the decommission node list add hooks at code to achieve
> a) No new task on executor
> b) Remove shuffle data mapping info for the node to be decommissioned from
> the mapOutputTracker
> c) Do not count fetchFailure from decommissioned towards stage failure
> On the receiving info that node is to be decommissioned, the below action
> needs to be performed by DecommissionTracker on driver:
> * Add the entry of Nodes in DecommissionTracker with termination time and
> node state as "DECOMMISSIONING".
> * Stop assigning any new tasks on executors on the nodes which are candidate
> for decommission. This makes sure slowly as the tasks finish the usage of
> this node would die down.
> * Kill all the executors for the decommissioning nodes after configurable
> period of time, say "spark.graceful.decommission.executor.leasetimePct". This
> killing ensures two things. Firstly, the task failure will be attributed in
> job failure count. Second, avoid generation on more shuffle data on the node
> that will eventually be lost. The node state is set to
> "EXECUTOR_DECOMMISSIONED".
> * Mark Shuffle data on the node as unavailable after
> "spark.qubole.graceful.decommission.shuffedata.leasetimePct" time. This will
> ensure that recomputation of missing shuffle partition is done early, rather
> than reducers failing with a time-consuming FetchFailure. The node state is
> set to "SHUFFLE_DECOMMISSIONED".
> * Mark Node as Terminated after the termination time. Now the state of the
> node is "TERMINATED".
> * Remove the node entry from Decommission Tracker if the same host name is
> reused.(This is not uncommon in many public cloud environments).
> This is the life cycle of the nodes which is decommissioned
> DECOMMISSIONING -> EXECUTOR_DECOMMISSIONED -> SHUFFLEDATA_DECOMMISSIONED ->
> TERMINATED.
> *Why do we exit the executors decommission before the shuffle decommission
> service? *- There are 2 reasons why we are exiting the executors before the
> shuffle service
> a) As per the current logic whenever we received the node decommissioning we
> stop assigning the new task to the executor running on that node. We give
> some time to the task already running on that executor to complete before
> killing the executors. If we keep the executors running till the end, there
> are chances of generating more shuffle data which will be eventually lost,
> triggering a recompute in future. This approach minimizes the recomputation
> of the shuffle data and maximise the usage of that shuffle data on the node
> by increasing the availability of it till the end.
> b) We want to keep the shuffle data till the time where the node is about to
> be lost, So if there are some task that is dependent on that shuffle data can
> complete and we don't have to recompute the shuffle data if none of the task
> required the shuffle data.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]