This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5b80639e643 [SPARK-45250][CORE] Support stage level task resource
profile for yarn cluster when dynamic allocation disabled
5b80639e643 is described below
commit 5b80639e643b6dd09dd64c3f43ec039b2ef2f9fd
Author: Bobby Wang <[email protected]>
AuthorDate: Mon Oct 2 23:00:56 2023 -0500
[SPARK-45250][CORE] Support stage level task resource profile for yarn
cluster when dynamic allocation disabled
### What changes were proposed in this pull request?
This PR is a follow-up of https://github.com/apache/spark/pull/37268 which
supports stage level task resource profile for standalone cluster when dynamic
allocation disabled. This PR enables stage-level task resource profile for yarn
cluster.
### Why are the changes needed?
Users who work on spark ML/DL cases running on Yarn would expect
stage-level task resource profile feature.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
The current tests of https://github.com/apache/spark/pull/37268 can also
cover this PR since both yarn and standalone cluster share the same
TaskSchedulerImpl class which implements this feature. Apart from that,
modifying the existing test to cover yarn cluster. Apart from that, I also
performed some manual tests which have been updated in the comments.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43030 from wbo4958/yarn-task-resoure-profile.
Authored-by: Bobby Wang <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../apache/spark/resource/ResourceProfileManager.scala | 6 +++---
.../spark/resource/ResourceProfileManagerSuite.scala | 15 +++++++++++++--
docs/configuration.md | 2 +-
docs/running-on-yarn.md | 6 +++++-
4 files changed, 22 insertions(+), 7 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index 9f98d4d9c9c..cd7124a5724 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -67,9 +67,9 @@ private[spark] class ResourceProfileManager(sparkConf:
SparkConf,
*/
private[spark] def isSupported(rp: ResourceProfile): Boolean = {
if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
- if ((notRunningUnitTests || testExceptionThrown) &&
!isStandaloneOrLocalCluster) {
- throw new SparkException("TaskResourceProfiles are only supported for
Standalone " +
- "cluster for now when dynamic allocation is disabled.")
+ if ((notRunningUnitTests || testExceptionThrown) &&
!(isStandaloneOrLocalCluster || isYarn)) {
+ throw new SparkException("TaskResourceProfiles are only supported for
Standalone and " +
+ "Yarn cluster for now when dynamic allocation is disabled.")
}
} else {
val isNotDefaultProfile = rp.id !=
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
diff --git
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
index e97d5c7883a..77dc7bcb4c5 100644
---
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
@@ -126,18 +126,29 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
val defaultProf = rpmanager.defaultResourceProfile
assert(rpmanager.isSupported(defaultProf))
- // task resource profile.
+ // Standalone: supports task resource profile.
val gpuTaskReq = new TaskResourceRequests().resource("gpu", 1)
val taskProf = new TaskResourceProfile(gpuTaskReq.requests)
assert(rpmanager.isSupported(taskProf))
+ // Local: doesn't support task resource profile.
conf.setMaster("local")
rpmanager = new ResourceProfileManager(conf, listenerBus)
val error = intercept[SparkException] {
rpmanager.isSupported(taskProf)
}.getMessage
assert(error === "TaskResourceProfiles are only supported for Standalone "
+
- "cluster for now when dynamic allocation is disabled.")
+ "and Yarn cluster for now when dynamic allocation is disabled.")
+
+ // Local cluster: supports task resource profile.
+ conf.setMaster("local-cluster[1, 1, 1024]")
+ rpmanager = new ResourceProfileManager(conf, listenerBus)
+ assert(rpmanager.isSupported(taskProf))
+
+ // Yarn: supports task resource profile.
+ conf.setMaster("yarn")
+ rpmanager = new ResourceProfileManager(conf, listenerBus)
+ assert(rpmanager.isSupported(taskProf))
}
test("isSupported task resource profiles with dynamic allocation enabled") {
diff --git a/docs/configuration.md b/docs/configuration.md
index 39e02f36af0..e46ead8f2fc 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3668,7 +3668,7 @@ See your cluster manager specific page for requirements
and details on each of -
# Stage Level Scheduling Overview
The stage level scheduling feature allows users to specify task and executor
resource requirements at the stage level. This allows for different stages to
run with executors that have different resources. A prime example of this is
one ETL stage runs with executors with just CPUs, the next stage is an ML stage
that needs GPUs. Stage level scheduling allows for user to request different
executors that have GPUs when the ML stage runs rather then having to acquire
executors with GPUs at th [...]
-This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN, Kubernetes and Standalone when dynamic allocation is
enabled. When dynamic allocation is disabled, it allows users to specify
different task resource requirements at stage level, and this is supported on
Standalone cluster right now. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
or [Standa [...]
+This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN, Kubernetes and Standalone when dynamic allocation is
enabled. When dynamic allocation is disabled, it allows users to specify
different task resource requirements at stage level, and this is supported on
YARN and Standalone cluster right now. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page o
[...]
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. When dynamic allocation is disabled, tasks with different task
resource requirements will share executors with `DEFAULT_RESOURCE_PROFILE`.
While when dynamic allocation is enabled, the current implementation acquires
new executors for each `ResourceProfile` created and currently has to be an
exact match. Spark does not try to fit tasks into an executor that require a
different ResourceProfile than the [...]
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 709cffda9b0..c9a0af56a5a 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -759,7 +759,11 @@ YARN does not tell Spark the addresses of the resources
allocated to each contai
# Stage Level Scheduling Overview
-Stage level scheduling is supported on YARN when dynamic allocation is
enabled. One thing to note that is YARN specific is that each ResourceProfile
requires a different container priority on YARN. The mapping is simply the
ResourceProfile id becomes the priority, on YARN lower numbers are higher
priority. This means that profiles created earlier will have a higher priority
in YARN. Normally this won't matter as Spark finishes one stage before starting
another one, the only case this mig [...]
+Stage level scheduling is supported on YARN:
+- When dynamic allocation is disabled: It allows users to specify different
task resource requirements at the stage level and will use the same executors
requested at startup.
+- When dynamic allocation is enabled: It allows users to specify task and
executor resource requirements at the stage level and will request the extra
executors.
+
+One thing to note that is YARN specific is that each ResourceProfile requires
a different container priority on YARN. The mapping is simply the
ResourceProfile id becomes the priority, on YARN lower numbers are higher
priority. This means that profiles created earlier will have a higher priority
in YARN. Normally this won't matter as Spark finishes one stage before starting
another one, the only case this might have an affect is in a job server type
scenario, so its something to keep in mind.
Note there is a difference in the way custom resources are handled between the
base default profile and custom ResourceProfiles. To allow for the user to
request YARN containers with extra resources without Spark scheduling on them,
the user can specify resources via the
<code>spark.yarn.executor.resource.</code> config. Those configs are only used
in the base default profile though and do not get propagated into any other
custom ResourceProfiles. This is because there would be no way to [...]
# Important notes
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]