[ 
https://issues.apache.org/jira/browse/SPARK-27005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16779503#comment-16779503
 ] 

Xingbo Jiang edited comment on SPARK-27005 at 3/4/19 4:44 PM:
--------------------------------------------------------------

*API Changes [draft pending design discussion]* 

{code:java}
class RDD[T] {

    /** Indicate resources requirement on computing the RDD. */

    def requireResources(numCores: Int, accelerators: Map[String, Int]): RDD[T] 
= ???

}

class TaskContext[T] {

    /** Indexes of accelerators allocated to this task. **/

    def accelerators(): Seq[Int] = ???

}

/** Resource requirements for each task. **/
 case class TaskResourceRequirements(
     numCores: Int,
     accelerators: Map[String, Int] = Map.empty)

{code}

*Design Sketch*

*Task Resource Requirements*
 We use a case class TaskResourceRequirements to represent resource 
requirements for each task, it contains the messages that the number of cores 
required and a map of accelerator resources requirements. Users can change the 
message from RDD API, then the class is generated from the RDD chain in 
DAGScheduler, and finally passed to TaskScheduler.

*spark.task.cpus and spark.task.accelerator.gpus*
 Add a new config spark.task.accelerator.gpus to specify the default number of 
GPUs required per task. This config is used similar to spark.task.cpus, if user 
doesn’t specify task resource requirements from RDD/PandasUDF API, then 
spark.task.cpus and spark.task.accelerator.gpus are used as default value.

CPUS_PER_TASK(spark.task.cpus) is a global config with int value to specify the 
number of cores each task shall be assigned. Since we make task resource 
requirement a per-stage config, to keep backward compatibility of 
CPUS_PER_TASK, we shall change its default value to 1 core and empty 
accelerator resources, and make it the default resource requirements for each 
RDD unless override or user specifies.

*Expand RDD/Stage to support accelerator*
 Recursive search for accelerator requirements in RDD chains in the same stage, 
put the requirements into Stage/Task.

*Expand SchedulerBackend to manage resources*
 Update the RegisterExecutor message to carry accelerator resources an executor 
provides, thus SchedulerBackend can init the ExecutorData correctly. 
SchedulerBackend can allocate and recycle resources according to Task status 
updates it receives.

*Manage accelerator resources in Worker*
 Since we assume homogeneous work resources, the accelerator resources info can 
be read from a global conf file. The Worker can use a map to store available 
accelerator resources internally. Similar to 
`allocateWorkerResourceToExecutors()`, it can assign accelerator resources to 
executors. The accelerator resources map shall get updated on message 
LaunchExecutor and ExecutorStateChanged.

*Expand TaskScheduler to support accelerator*
 We shall keep a separated queue to store the pending tasks that have non-empty 
accelerator resources requirements in TaskSetManager, thus when the WorkOffers 
contains accelerator resources, we can match the offers with the special task 
queue first, thus we can avoid allocate tasks that only require CPUs on a node 
with accelerators. If the submitted job don’t require accelerator resources, 
then the scheduling behavior and efficiency shall be the same as previously.

*Return GPU index from TaskContext*
 On TaskContext creation, we shall allocate free GPU index(s) to the context, 
so we can avoid collisions.

*YARN Support*
 User can request GPU resources in the Spark application via spark-submit, the 
application with GPU resources can be launched using YARN+Docker, so user can 
easily define the DL environment in the Dockerfile.

Spark need to upgrade YARN to 3.1.2+ to enable GPU support, it support the 
following features:
 * Auto discovery of GPU resources.
 * GPU isolation at process level.
 * Placement constraints.
 * Heterogeneous device types via node labels.

*Kubernetes Support*
 User can specify GPU requirements for the Spark application on Kubernetes by 
the following possible choices:
 spark-submit w/ the same GPU configs used by standalone/YARN.
 spark-submit w/ pod template (new feature for Spark 3.0).
 Spark-submit w/ mutating webhook confs to modify pods at runtime.

User can run Spark jobs on Kubernetes using nvidia-docker to access GPUs, 
Kubernetes also support the following features:
 * Auto discovery of GPU resources.
 * GPU isolation at executor pod level.
 * Placement constraints via node selectors.
 * Heterogeneous device types via node labels.


was (Author: jiangxb1987):
*API Changes [draft pending design discussion]* 

{code:java}
class RDD[T] {

    /** Indicate resources requirement on computing the RDD. */

    def requireResources(numCores: Int, accelerators: Map[String, Int]): RDD[T] 
= ???

}

class TaskContext[T] {

    /** Indexes of accelerators allocated to this task. **/

    def accelerators(): Seq[Int] = ???

}

/** Resource requirements for each task. **/
 case class TaskResourceRequirements(
     numCores: Int,
     accelerators: Map[String, Int] = Map.empty)

{code}

*Design Sketch*

*Task Resource Requirements*
 We use a case class TaskResourceRequirements to represent resource 
requirements for each task, it contains the messages that the number of cores 
required and a map of accelerator resources requirements. Users can change the 
message from RDD API, then the class is generated from the RDD chain in 
DAGScheduler, and finally passed to TaskScheduler.

*spark.task.cpus and spark.task.gpus*
 Add a new config spark.task.gpus to specify the default number of GPUs 
required per task. This config is used similar to spark.task.cpus, if user 
doesn’t specify task resource requirements from RDD/PandasUDF API, then 
spark.task.cpus and spark.task.gpus are used as default value.

CPUS_PER_TASK(spark.task.cpus) is a global config with int value to specify the 
number of cores each task shall be assigned. Since we make task resource 
requirement a per-stage config, to keep backward compatibility of 
CPUS_PER_TASK, we shall change its default value to 1 core and empty 
accelerator resources, and make it the default resource requirements for each 
RDD unless override or user specifies.

*Expand RDD/Stage to support GPU*
 Recursive search for GPU requirements in RDD chains in the same stage, put the 
requirements into Stage/Task.

*Expand SchedulerBackend to manage resources*
 Update the RegisterExecutor message to carry accelerator resources an executor 
provides, thus SchedulerBackend can init the ExecutorData correctly. 
SchedulerBackend can allocate and recycle resources according to Task status 
updates it receives.

*Manage accelerator resources in Worker*
 Since we assume homogeneous work resources, the accelerator resources info can 
be read from a global conf file. The Worker can use a map to store available 
accelerator resources internally. Similar to 
`allocateWorkerResourceToExecutors()`, it can assign accelerator resources to 
executors. The accelerator resources map shall get updated on message 
LaunchExecutor and ExecutorStateChanged.

*Expand TaskScheduler to support GPU*
 We shall keep a separated queue to store the pending tasks that have non-empty 
accelerator resources requirements in TaskSetManager, thus when the WorkOffers 
contains accelerator resources, we can match the offers with the special task 
queue first, thus we can avoid allocate tasks that only require CPUs on a node 
with accelerators. If the submitted job don’t require accelerator resources, 
then the scheduling behavior and efficiency shall be the same as previously.

*Return GPU index from TaskContext*
 On TaskContext creation, we shall allocate free GPU index(s) to the context, 
so we can avoid collisions.

*YARN Support*
 User can request GPU resources in the Spark application via spark-submit, the 
application with GPU resources can be launched useing YARN+Docker, so user can 
easily define the DL environment in the Dockerfile.

Spark need to upgrade YARN to 3.1.2+ to enable GPU support, it support the 
following features:
 * Auto discovery of GPU resources.
 * GPU isolation at process level.
 * Placement constraints.
 * Heterogeneous device types via node labels.

*Kubernetes Support*
 User can specify GPU requirements for the Spark application on Kubernetes by 
the following possible choices:
 spark-submit w/ the same GPU configs used by standalone/YARN.
 spark-submit w/ pod template (new feature for Spark 3.0).
 Spark-submit w/ mutating webhook confs to modify pods at runtime.

User can run Spark jobs on Kubernetes using nvidia-docker to access GPUs, 
Kubernetes also support the following features:
 * Auto discovery of GPU resources.
 * GPU isolation at executor pod level.
 * Placement constraints via node selectors.
 * Heterogeneous device types via node labels.

> Design sketch: Accelerator-aware scheduling
> -------------------------------------------
>
>                 Key: SPARK-27005
>                 URL: https://issues.apache.org/jira/browse/SPARK-27005
>             Project: Spark
>          Issue Type: Story
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Xingbo Jiang
>            Priority: Major
>
> This task is to outline a design sketch for the accelerator-aware scheduling 
> SPIP discussion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to