[ 
https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071953#comment-17071953
 ] 

Thomas Graves commented on SPARK-30873:
---------------------------------------

so I think this is a dup of SPARK-30835.

> Handling Node Decommissioning for Yarn cluster manger in Spark
> --------------------------------------------------------------
>
>                 Key: SPARK-30873
>                 URL: https://issues.apache.org/jira/browse/SPARK-30873
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, YARN
>    Affects Versions: 3.1.0
>            Reporter: Saurabh Chawla
>            Priority: Major
>
> In many public cloud environments, the node loss (in case of AWS 
> SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed 
> activity. 
> The cloud provider intimates the cluster manager about the possible loss of 
> node ahead of time. Few examples is listed here:
> a) Spot loss in AWS(2 min before event)
> b) GCP Pre-emptible VM loss (30 second before event)
> c) AWS Spot block loss with info on termination time (generally few tens of 
> minutes before decommission as configured in Yarn)
> This JIRA tries to make spark leverage the knowledge of the node loss in 
> future, and tries to adjust the scheduling of tasks to minimise the impact on 
> the application. 
> It is well known that when a host is lost, the executors, its running tasks, 
> their caches and also Shuffle data is lost. This could result in wastage of 
> compute and other resources.
> The focus here is to build a framework for YARN, that can be extended for 
> other cluster managers to handle such scenario.
> The framework must handle one or more of the following:-
> 1) Prevent new tasks from starting on any executors on decommissioning Nodes. 
> 2) Decide to kill the running tasks so that they can be restarted elsewhere 
> (assuming they will not complete within the deadline) OR we can allow them to 
> continue hoping they will finish within deadline.
> 3) Clear the shuffle data entry from MapOutputTracker of decommission node 
> hostname to prevent the shuffle fetchfailed exception.The most significant 
> advantage of unregistering shuffle outputs when Spark schedules the first 
> re-attempt to compute the missing blocks, it notices all of the missing 
> blocks from decommissioned nodes and recovers in only one attempt. This 
> speeds up the recovery process significantly over the scheduled Spark 
> implementation, where stages might be rescheduled multiple times to recompute 
> missing shuffles from all nodes, and prevent jobs from being stuck for hours 
> failing and recomputing.
> 4) Prevent the stage to abort due to the fetchfailed exception in case of 
> decommissioning of node. In Spark there is number of consecutive stage 
> attempts allowed before a stage is aborted.This is controlled by the config 
> spark.stage.maxConsecutiveAttempts. Not accounting fetch fails due 
> decommissioning of nodes towards stage failure improves the reliability of 
> the system.
> Main components of change
> 1) Get the ClusterInfo update from the Resource Manager -> Application Master 
> -> Spark Driver.
> 2) DecommissionTracker, resides inside driver, tracks all the decommissioned 
> nodes and take necessary action and state transition.
> 3) Based on the decommission node list add hooks at code to achieve
>  a) No new task on executor
>  b) Remove shuffle data mapping info for the node to be decommissioned from 
> the mapOutputTracker
>  c) Do not count fetchFailure from decommissioned towards stage failure
> On the receiving info that node is to be decommissioned, the below action 
> needs to be performed by DecommissionTracker on driver:
>  * Add the entry of Nodes in DecommissionTracker with termination time and 
> node state as "DECOMMISSIONING".
>  * Stop assigning any new tasks on executors on the nodes which are candidate 
> for decommission. This makes sure slowly as the tasks finish the usage of 
> this node would die down.
>  * Kill all the executors for the decommissioning nodes after configurable 
> period of time, say "spark.graceful.decommission.executor.leasetimePct". This 
> killing ensures two things. Firstly, the task failure will be attributed in 
> job failure count. Second, avoid generation on more shuffle data on the node 
> that will eventually be lost. The node state is set to 
> "EXECUTOR_DECOMMISSIONED". 
>  * Mark Shuffle data on the node as unavailable after 
> "spark.qubole.graceful.decommission.shuffedata.leasetimePct" time. This will 
> ensure that recomputation of missing shuffle partition is done early, rather 
> than reducers failing with a time-consuming FetchFailure. The node state is 
> set to "SHUFFLE_DECOMMISSIONED". 
>  * Mark Node as Terminated after the termination time. Now the state of the 
> node is "TERMINATED".
>  * Remove the node entry from Decommission Tracker if the same host name is 
> reused.(This is not uncommon in many public cloud environments).
> This is the life cycle of the nodes which is decommissioned
> DECOMMISSIONING -> EXECUTOR_DECOMMISSIONED -> SHUFFLEDATA_DECOMMISSIONED -> 
> TERMINATED.
> *Why do we exit the executors decommission before the shuffle decommission 
> service? *- There are 2 reasons why we are exiting the executors before the 
> shuffle service
> a) As per the current logic whenever we received the node decommissioning we 
> stop assigning the new task to the executor running on that node. We give 
> some time to the task already running on that executor to complete before 
> killing the executors. If we keep the executors running till the end, there 
> are chances of generating more shuffle data which will be eventually lost, 
> triggering a recompute in future. This approach minimizes the recomputation 
> of the shuffle data and maximise the usage of that shuffle data on the node 
> by increasing the availability of it till the end.
> b) We want to keep the shuffle data till the time where the node is about to 
> be lost, So if there are some task that is dependent on that shuffle data can 
> complete and we don't have to recompute the shuffle data if none of the task 
> required the shuffle data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to