This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 06635e25f17 [SPARK-46006][YARN] YarnAllocator miss clean
targetNumExecutorsPerResourceProfileId after YarnSchedulerBackend call stop
06635e25f17 is described below
commit 06635e25f170e61f6cfe53232d001993ec7d376d
Author: Angerszhuuuu <[email protected]>
AuthorDate: Wed Nov 22 16:50:21 2023 +0800
[SPARK-46006][YARN] YarnAllocator miss clean
targetNumExecutorsPerResourceProfileId after YarnSchedulerBackend call stop
### What changes were proposed in this pull request?
We meet a case that user call sc.stop() after run all custom code, but
stuck in some place.
Cause below situation
1. User call sc.stop()
2. sc.stop() stuck in some process, but SchedulerBackend.stop was called
3. Since yarn ApplicationMaster didn't finish, still call
YarnAllocator.allocateResources()
4. Since driver endpoint stop new allocated executor failed to register
5. untll trigger Max number of executor failures
6. Caused by
Before call CoarseGrainedSchedulerBackend.stop() will call
YarnSchedulerBackend.requestTotalExecutor() to clean request info

When YarnAllocator handle then empty resource request, since
resourceTotalExecutorsWithPreferedLocalities is empty, miss clean
targetNumExecutorsPerResourceProfileId.

### Why are the changes needed?
Fix bug
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43906 from AngersZhuuuu/SPARK-46006.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../apache/spark/deploy/yarn/YarnAllocator.scala | 28 +++++++++++++---------
1 file changed, 17 insertions(+), 11 deletions(-)
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index d9a5bd6240a..f8afbc81c12 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -384,19 +384,25 @@ private[yarn] class YarnAllocator(
this.numLocalityAwareTasksPerResourceProfileId =
numLocalityAwareTasksPerResourceProfileId
this.hostToLocalTaskCountPerResourceProfileId =
hostToLocalTaskCountPerResourceProfileId
- val res = resourceProfileToTotalExecs.map { case (rp, numExecs) =>
- createYarnResourceForResourceProfile(rp)
- if (numExecs != getOrUpdateTargetNumExecutorsForRPId(rp.id)) {
- logInfo(s"Driver requested a total number of $numExecs executor(s) " +
- s"for resource profile id: ${rp.id}.")
- targetNumExecutorsPerResourceProfileId(rp.id) = numExecs
- allocatorNodeHealthTracker.setSchedulerExcludedNodes(excludedNodes)
- true
- } else {
- false
+ if (resourceProfileToTotalExecs.isEmpty) {
+ targetNumExecutorsPerResourceProfileId.clear()
+ allocatorNodeHealthTracker.setSchedulerExcludedNodes(excludedNodes)
+ true
+ } else {
+ val res = resourceProfileToTotalExecs.map { case (rp, numExecs) =>
+ createYarnResourceForResourceProfile(rp)
+ if (numExecs != getOrUpdateTargetNumExecutorsForRPId(rp.id)) {
+ logInfo(s"Driver requested a total number of $numExecs executor(s) "
+
+ s"for resource profile id: ${rp.id}.")
+ targetNumExecutorsPerResourceProfileId(rp.id) = numExecs
+ allocatorNodeHealthTracker.setSchedulerExcludedNodes(excludedNodes)
+ true
+ } else {
+ false
+ }
}
+ res.exists(_ == true)
}
- res.exists(_ == true)
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]