[ https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Junfan Zhang updated SPARK-30873: --------------------------------- Summary: Handling Node Decommissioning for Yarn cluster manager in Spark (was: Handling Node Decommissioning for Yarn cluster manAger in Spark) > Handling Node Decommissioning for Yarn cluster manager in Spark > --------------------------------------------------------------- > > Key: SPARK-30873 > URL: https://issues.apache.org/jira/browse/SPARK-30873 > Project: Spark > Issue Type: Improvement > Components: Spark Core, YARN > Affects Versions: 3.1.0 > Reporter: Saurabh Chawla > Priority: Major > > In many public cloud environments, the node loss (in case of AWS > SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed > activity. > The cloud provider intimates the cluster manager about the possible loss of > node ahead of time. Few examples is listed here: > a) Spot loss in AWS(2 min before event) > b) GCP Pre-emptible VM loss (30 second before event) > c) AWS Spot block loss with info on termination time (generally few tens of > minutes before decommission as configured in Yarn) > This JIRA tries to make spark leverage the knowledge of the node loss in > future, and tries to adjust the scheduling of tasks to minimise the impact on > the application. > It is well known that when a host is lost, the executors, its running tasks, > their caches and also Shuffle data is lost. This could result in wastage of > compute and other resources. > The focus here is to build a framework for YARN, that can be extended for > other cluster managers to handle such scenario. > The framework must handle one or more of the following:- > 1) Prevent new tasks from starting on any executors on decommissioning Nodes. > 2) Decide to kill the running tasks so that they can be restarted elsewhere > (assuming they will not complete within the deadline) OR we can allow them to > continue hoping they will finish within deadline. > 3) Clear the shuffle data entry from MapOutputTracker of decommission node > hostname to prevent the shuffle fetchfailed exception.The most significant > advantage of unregistering shuffle outputs when Spark schedules the first > re-attempt to compute the missing blocks, it notices all of the missing > blocks from decommissioned nodes and recovers in only one attempt. This > speeds up the recovery process significantly over the scheduled Spark > implementation, where stages might be rescheduled multiple times to recompute > missing shuffles from all nodes, and prevent jobs from being stuck for hours > failing and recomputing. > 4) Prevent the stage to abort due to the fetchfailed exception in case of > decommissioning of node. In Spark there is number of consecutive stage > attempts allowed before a stage is aborted.This is controlled by the config > spark.stage.maxConsecutiveAttempts. Not accounting fetch fails due > decommissioning of nodes towards stage failure improves the reliability of > the system. > Main components of change > 1) Get the ClusterInfo update from the Resource Manager -> Application Master > -> Spark Driver. > 2) DecommissionTracker, resides inside driver, tracks all the decommissioned > nodes and take necessary action and state transition. > 3) Based on the decommission node list add hooks at code to achieve > a) No new task on executor > b) Remove shuffle data mapping info for the node to be decommissioned from > the mapOutputTracker > c) Do not count fetchFailure from decommissioned towards stage failure > On the receiving info that node is to be decommissioned, the below action > needs to be performed by DecommissionTracker on driver: > * Add the entry of Nodes in DecommissionTracker with termination time and > node state as "DECOMMISSIONING". > * Stop assigning any new tasks on executors on the nodes which are candidate > for decommission. This makes sure slowly as the tasks finish the usage of > this node would die down. > * Kill all the executors for the decommissioning nodes after configurable > period of time, say "spark.graceful.decommission.executor.leasetimePct". This > killing ensures two things. Firstly, the task failure will be attributed in > job failure count. Second, avoid generation on more shuffle data on the node > that will eventually be lost. The node state is set to > "EXECUTOR_DECOMMISSIONED". > * Mark Shuffle data on the node as unavailable after > "spark.qubole.graceful.decommission.shuffedata.leasetimePct" time. This will > ensure that recomputation of missing shuffle partition is done early, rather > than reducers failing with a time-consuming FetchFailure. The node state is > set to "SHUFFLE_DECOMMISSIONED". > * Mark Node as Terminated after the termination time. Now the state of the > node is "TERMINATED". > * Remove the node entry from Decommission Tracker if the same host name is > reused.(This is not uncommon in many public cloud environments). > This is the life cycle of the nodes which is decommissioned > DECOMMISSIONING -> EXECUTOR_DECOMMISSIONED -> SHUFFLEDATA_DECOMMISSIONED -> > TERMINATED. > *Why do we exit the executors decommission before the shuffle decommission > service? *- There are 2 reasons why we are exiting the executors before the > shuffle service > a) As per the current logic whenever we received the node decommissioning we > stop assigning the new task to the executor running on that node. We give > some time to the task already running on that executor to complete before > killing the executors. If we keep the executors running till the end, there > are chances of generating more shuffle data which will be eventually lost, > triggering a recompute in future. This approach minimizes the recomputation > of the shuffle data and maximise the usage of that shuffle data on the node > by increasing the availability of it till the end. > b) We want to keep the shuffle data till the time where the node is about to > be lost, So if there are some task that is dependent on that shuffle data can > complete and we don't have to recompute the shuffle data if none of the task > required the shuffle data. -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org