[
https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039884#comment-17039884
]
Saurabh Chawla commented on SPARK-30873:
----------------------------------------
We have raised the WIP PR for this.
cc [~holdenkarau] [~itskals][~amargoor]
> Handling Node Decommissioning for Yarn cluster manger in Spark
> --------------------------------------------------------------
>
> Key: SPARK-30873
> URL: https://issues.apache.org/jira/browse/SPARK-30873
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core, YARN
> Affects Versions: 3.0.0
> Reporter: Saurabh Chawla
> Priority: Major
>
> In many public cloud environments, the node loss (in case of AWS
> SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed
> activity.
> The cloud provider intimates the cluster manager about the possible loss of
> node ahead of time. Few examples is listed here:
> a) Spot loss in AWS(2 min before event)
> b) GCP Pre-emptible VM loss (30 second before event)
> c) AWS Spot block loss with info on termination time (generally few tens of
> minutes before decommission as configured in Yarn)
> This JIRA tries to make spark leverage the knowledge of the node loss in
> future, and tries to adjust the scheduling of tasks to minimise the impact on
> the application.
> It is well known that when a host is lost, the executors, its running tasks,
> their caches and also Shuffle data is lost. This could result in wastage of
> compute and other resources.
> The focus here is to build a framework for YARN, that can be extended for
> other cluster managers to handle such scenario.
> The framework must handle one or more of the following:-
> 1) Prevent new tasks from starting on any executors on decommissioning Nodes.
> 2) Decide to kill the running tasks so that they can be restarted elsewhere
> (assuming they will not complete within the deadline) OR we can allow them to
> continue hoping they will finish within deadline.
> 3) Clear the shuffle data entry from MapOutputTracker of decommission node
> hostname to prevent the shuffle fetchfailed exception.The most significant
> advantage of unregistering shuffle outputs when Spark schedules the first
> re-attempt to compute the missing blocks, it notices all of the missing
> blocks from decommissioned nodes and recovers in only one attempt. This
> speeds up the recovery process significantly over the scheduled Spark
> implementation, where stages might be rescheduled multiple times to recompute
> missing shuffles from all nodes, and prevent jobs from being stuck for hours
> failing and recomputing.
> 4) Prevent the stage to abort due to the fetchfailed exception in case of
> decommissioning of node. In Spark there is number of consecutive stage
> attempts allowed before a stage is aborted.This is controlled by the config
> spark.stage.maxConsecutiveAttempts. Not accounting fetch fails due
> decommissioning of nodes towards stage failure improves the reliability of
> the system.
> Main components of change
> 1) Get the ClusterInfo update from the Resource Manager -> Application Master
> -> Spark Driver.
> 2) DecommissionTracker, resides inside driver, tracks all the decommissioned
> nodes and take necessary action and state transition.
> 3) Based on the decommission node list add hooks at code to achieve
> a) No new task on executor
> b) Remove shuffle data mapping info for the node to be decommissioned from
> the mapOutputTracker
> c) Do not count fetchFailure from decommissioned towards stage failure
> On the receiving info that node is to be decommissioned, the below action
> needs to be performed by DecommissionTracker on driver:
> * Add the entry of Nodes in DecommissionTracker with termination time and
> node state as "DECOMMISSIONING".
> * Stop assigning any new tasks on executors on the nodes which are candidate
> for decommission. This makes sure slowly as the tasks finish the usage of
> this node would die down.
> * Kill all the executors for the decommissioning nodes after configurable
> period of time, say "spark.graceful.decommission.executor.leasetimePct". This
> killing ensures two things. Firstly, the task failure will be attributed in
> job failure count. Second, avoid generation on more shuffle data on the node
> that will eventually be lost. The node state is set to
> "EXECUTOR_DECOMMISSIONED".
> * Mark Shuffle data on the node as unavailable after
> "spark.qubole.graceful.decommission.shuffedata.leasetimePct" time. This will
> ensure that recomputation of missing shuffle partition is done early, rather
> than reducers failing with a time-consuming FetchFailure. The node state is
> set to "SHUFFLE_DECOMMISSIONED".
> * Mark Node as Terminated after the termination time. Now the state of the
> node is "TERMINATED".
> * Remove the node entry from Decommission Tracker if the same host name is
> reused.(This is not uncommon in many public cloud environments).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]