Hi all, I have a prototype for "Keep track of nodes which are going to be shut down & avoid scheduling new tasks" ( https://issues.apache.org/jira/browse/SPARK-20628) that I would like to discuss with the community. I added a WIP PR for that in https://github.com/apache/spark/pull/19267. The basic idea is implementing a mechanism similar to YARN's graceful decommission, but for Spark. There is a design document for this in https://github.com/apache/spark/files/1349653/Spark_Blacklisting_on_decommissioning-Scope.pdf. I would like to know the opinion of the list on this approach.
*More details about this proposal* In the PR we define a HostState type to represent the state of the cluster nodes, and take actions in CoarseGrainedSchedulerBackend.handleUpdatedHostState when a node transitions into a state where the node becomes partially or totally unavailable. Just like in YARN or Mesos, we propose a decommission mechanism with 2 phases, first a drain phase where the node is still running but not accepting further work (DECOMMISSIONING in YARN, and DRAIN in Mesos), followed by a second phase where executors in the node are forcibly shut down (DECOMMISIONED in YARN, and DOWN in Mesos). In this PR we focus only in YARN, and in the actions when the node transitions into DECOMMISSIONING state: blacklisting the node when it transitions to DECOMMISSIONING, and un-blacklist the node when it gets back to the normal healthy RUNNING state. The decommissioning process would not be initiated by Spark, but by an operator or an automated system (e.g. the cloud environment where YARN is running), on response to some relevant event (e.g. a cluster resize event), and it would consist on calling the YARN administrative command yarn rmadmin -refreshNodes -g for the affected node. Spark would just react to the node state transition events it receives from the cluster manager. To make this extensible to other cluster managers besides YARN, we define the HostState type in Spark, and keep the interaction with the specifics of each cluster manager into the corresponding packages. For example for YARN, when YarnAllocator gets a node state transition event, it converts the node event from the YARN specific NodeState into HostState, wraps it into a HostStatusUpdate message, and sends it to the CoarseGrainedSchedulerBackend, that then performs the required actions for that node. This code works on a modified version of Hadoop 2.7.3 with patches to support YARN-4676 (basic graceful decommission), and an approximation to YARN-3224 (when a node transitions into DECOMMISSIONING state the resource manager notifies that to each relevant application master by adding it to the list of updated nodes available in the AllocateResponse returned by the RM as a response to the AM heartbeat). For these reasons, this code won't work as-is on vanilla Hadoop. The main problem is that the decommissioning mechanism for YARN is not completely implemented (see YARN-914), and some of the parts that are implemented are only available for YARN 2.9.0 (see YARN-4676). To cope with this, we propose implementing an administrative command to send node transitions directly to the Spark driver, as HostStatusUpdate messages addressed to the CoarseGrainedSchedulerBackend. This command would be similar to the yarn rmadmin -refreshNodes -g, which is currently used for decommissioning nodes in YARN. When YARN-914 is complete, this could still be used as a secondary interface for decommissioning nodes, so nodes transitions could be signaled either by the cluster manager, or using the administrative command (either manually or through some automation implemented by the cloud environment). We would like to get some feedback on this approach in general, and in the administrative command solution in particular. If that sounds good, then we will work on modifying this PR so it works on vanilla Hadoop 2.7, and to implement the administrative command. Thanks, Juan Rodriguez Hortala