This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0602020eb3b3 [SPARK-49252][CORE] Make`TaskSetExcludeList` and
`HeathTracker` independent
0602020eb3b3 is described below
commit 0602020eb3b346a8c50ad32eeda4e6dabb70c584
Author: tianhanhu <[email protected]>
AuthorDate: Fri Aug 30 11:05:00 2024 +0800
[SPARK-49252][CORE] Make`TaskSetExcludeList` and `HeathTracker` independent
<!--
Thanks for sending a pull request! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://spark.apache.org/contributing.html
2. Ensure you have added or run the appropriate tests for your PR:
https://spark.apache.org/developer-tools.html
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][SPARK-XXXX] Your PR title ...'.
4. Be sure to keep the PR description updated to reflect all changes.
5. Please write your PR title to summarize what this PR proposes.
6. If possible, provide a concise example to reproduce the issue for a
faster review.
7. If you want to add a new configuration, please read the guideline
first for naming configurations in
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
8. If you want to add or modify an error type or message, please read the
guideline first in
'common/utils/src/main/resources/error/README.md'.
-->
### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section
is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster
reviews in your PR. See the examples below.
1. If you refactor some codes with changing classes, showing the class
hierarchy will help reviewers.
2. If you fix some SQL features, you can provide some references of other
DBMSes.
3. If there is design documentation, please add the link.
4. If there is a discussion in the mailing list, please add the link.
-->
Make the change such that `TaskSetExcludeList` and `HeathTracker` can be
enabled independently.
When application level `HealthTracker` is created, but taskset level
exclusion is not enabled, `TaskSetExcludeList` would be created in dry run
mode, where it still records and reports task failure data to `HealthTracker`
but does not participate in scheduler decision making.
### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
-->
Currently, when `spark.excludeOnFailure.enabled` is set to true, both task
set level exclusion (`TaskSetExcludeList`) and application level
(`HealthTracker`) would both be enabled.
In some cases, we only want to enable exclusion on a single dimension.
### Does this PR introduce _any_ user-facing change?
<!--
Note that it means *any* user-facing change including all aspects such as
the documentation fix.
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to show
the behavior difference if possible.
If possible, please also clarify if this is a user-facing change compared
to the released Spark versions or within the unreleased branches such as master.
If no, write 'No'.
-->
Yes, introduced two new user facing configs
`spark.excludeOnFailure.application.enabled` and
`spark.excludeOnFailure.taskAndStage.enabled` that allows setting exclusion for
taskset/application individually.
### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some
test cases that check the changes thoroughly including negative and positive
cases if possible.
If it was tested in a way different from regular unit tests, please clarify
how you tested step by step, ideally copy and paste-able, so that other
reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why
it was difficult to add.
If benchmark tests were added, please run the benchmarks in GitHub Actions
for the consistent environment, and the instructions could accord to:
https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
-->
New unit tests.
### Was this patch authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this
patch, please include the
phrase: 'Generated-by: ' followed by the name of the tool and its version.
If no, write 'No'.
Please refer to the [ASF Generative Tooling
Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
-->
No
Closes #47793 from tianhanhu/SPARK-49252_separate_exclusion.
Authored-by: tianhanhu <[email protected]>
Signed-off-by: Yi Wu <[email protected]>
---
.../org/apache/spark/internal/config/package.scala | 12 ++++++++
.../org/apache/spark/scheduler/HealthTracker.scala | 14 +++++----
.../spark/scheduler/TaskSetExcludeList.scala | 29 +++++++++++++++----
.../apache/spark/scheduler/TaskSetManager.scala | 19 +++++++++----
.../spark/scheduler/HealthTrackerSuite.scala | 17 +++++++++++
.../spark/scheduler/TaskSetManagerSuite.scala | 33 ++++++++++++++++++++++
docs/configuration.md | 27 ++++++++++++++++++
7 files changed, 135 insertions(+), 16 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index e9e411cc56b5..8224bcac2830 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -918,6 +918,18 @@ package object config {
.booleanConf
.createOptional
+ private[spark] val EXCLUDE_ON_FAILURE_ENABLED_APPLICATION =
+ ConfigBuilder("spark.excludeOnFailure.application.enabled")
+ .version("4.0.0")
+ .booleanConf
+ .createOptional
+
+ private[spark] val EXCLUDE_ON_FAILURE_ENABLED_TASK_AND_STAGE =
+ ConfigBuilder("spark.excludeOnFailure.taskAndStage.enabled")
+ .version("4.0.0")
+ .booleanConf
+ .createOptional
+
private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR =
ConfigBuilder("spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor")
.version("3.1.0")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
index 160607215390..82ec0ef91f4f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
@@ -425,14 +425,16 @@ private[spark] object HealthTracker extends Logging {
private val DEFAULT_TIMEOUT = "1h"
/**
- * Returns true if the excludeOnFailure is enabled, based on checking the
configuration
- * in the following order:
- * 1. Is it specifically enabled or disabled?
- * 2. Is it enabled via the legacy timeout conf?
- * 3. Default is off
+ * Returns true if the excludeOnFailure is enabled on the application level,
+ * based on checking the configuration in the following order:
+ * 1. Is application level exclusion specifically enabled or disabled?
+ * 2. Is overall exclusion feature enabled or disabled?
+ * 3. Is it enabled via the legacy timeout conf?
+ * 4. Default is off
*/
def isExcludeOnFailureEnabled(conf: SparkConf): Boolean = {
- conf.get(config.EXCLUDE_ON_FAILURE_ENABLED) match {
+ conf.get(config.EXCLUDE_ON_FAILURE_ENABLED_APPLICATION)
+ .orElse(conf.get(config.EXCLUDE_ON_FAILURE_ENABLED)) match {
case Some(enabled) =>
enabled
case None =>
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala
index c9aa74e0852b..363730529310 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala
@@ -31,6 +31,9 @@ import org.apache.spark.util.Clock
* which is handled by [[HealthTracker]]. Note that HealthTracker does not
know anything
* about task failures until a taskset completes successfully.
*
+ * If isDryRun is true, then this class will only function to store
information for application
+ * level exclusion, and will not actually exclude any tasks in task/stage
level.
+ *
* THREADING: This class is a helper to [[TaskSetManager]]; as with the
methods in
* [[TaskSetManager]] this class is designed only to be called from code with
a lock on the
* TaskScheduler (e.g. its event handlers). It should not be called from other
threads.
@@ -40,7 +43,8 @@ private[scheduler] class TaskSetExcludelist(
val conf: SparkConf,
val stageId: Int,
val stageAttemptId: Int,
- val clock: Clock) extends Logging {
+ val clock: Clock,
+ val isDryRun: Boolean = false) extends Logging {
private val MAX_TASK_ATTEMPTS_PER_EXECUTOR =
conf.get(config.MAX_TASK_ATTEMPTS_PER_EXECUTOR)
private val MAX_TASK_ATTEMPTS_PER_NODE =
conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE)
@@ -80,13 +84,13 @@ private[scheduler] class TaskSetExcludelist(
* of the scheduler, where those filters will have already been applied.
*/
def isExecutorExcludedForTask(executorId: String, index: Int): Boolean = {
- execToFailures.get(executorId).exists { execFailures =>
+ !isDryRun && execToFailures.get(executorId).exists { execFailures =>
execFailures.getNumTaskFailures(index) >= MAX_TASK_ATTEMPTS_PER_EXECUTOR
}
}
def isNodeExcludedForTask(node: String, index: Int): Boolean = {
- nodeToExcludedTaskIndexes.get(node).exists(_.contains(index))
+ !isDryRun && nodeToExcludedTaskIndexes.get(node).exists(_.contains(index))
}
/**
@@ -96,11 +100,11 @@ private[scheduler] class TaskSetExcludelist(
* scheduler, where those filters will already have been applied.
*/
def isExecutorExcludedForTaskSet(executorId: String): Boolean = {
- excludedExecs.contains(executorId)
+ !isDryRun && excludedExecs.contains(executorId)
}
def isNodeExcludedForTaskSet(node: String): Boolean = {
- excludedNodes.contains(node)
+ !isDryRun && excludedNodes.contains(node)
}
private[scheduler] def updateExcludedForFailedTask(
@@ -163,3 +167,18 @@ private[scheduler] class TaskSetExcludelist(
}
}
}
+
+private[scheduler] object TaskSetExcludelist {
+
+ /**
+ * Returns true if the excludeOnFailure is enabled on the task/stage level,
+ * based on checking the configuration in the following order:
+ * 1. Is taskset level exclusion specifically enabled or disabled?
+ * 2. Is overall exclusion feature enabled or disabled?
+ * 3. Default is off
+ */
+ def isExcludeOnFailureEnabled(conf: SparkConf): Boolean = {
+ conf.get(config.EXCLUDE_ON_FAILURE_ENABLED_TASK_AND_STAGE)
+ .orElse(conf.get(config.EXCLUDE_ON_FAILURE_ENABLED)).getOrElse(false)
+ }
+}
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 7dba4a6dc8fc..a3d074ddd56c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -143,8 +143,18 @@ private[spark] class TaskSetManager(
private var calculatedTasks = 0
private[scheduler] val taskSetExcludelistHelperOpt:
Option[TaskSetExcludelist] = {
- healthTracker.map { _ =>
- new TaskSetExcludelist(sched.sc.listenerBus, conf, stageId,
taskSet.stageAttemptId, clock)
+ if (TaskSetExcludelist.isExcludeOnFailureEnabled(conf)) {
+ Some(new TaskSetExcludelist(sched.sc.listenerBus, conf, stageId,
+ taskSet.stageAttemptId, clock))
+ } else if (healthTracker.isDefined) {
+ // If we enabled exclusion at application level but not at taskset level
exclusion, we create
+ // TaskSetExcludelist in dry run mode.
+ // In this mode, TaskSetExcludeList would not exclude any executors but
only store
+ // task failure information.
+ Some(new TaskSetExcludelist(sched.sc.listenerBus, conf, stageId,
+ taskSet.stageAttemptId, clock, isDryRun = true))
+ } else {
+ None
}
}
@@ -698,7 +708,6 @@ private[spark] class TaskSetManager(
private[scheduler] def getCompletelyExcludedTaskIfAny(
hostToExecutors: HashMap[String, HashSet[String]]): Option[Int] = {
taskSetExcludelistHelperOpt.flatMap { taskSetExcludelist =>
- val appHealthTracker = healthTracker.get
// Only look for unschedulable tasks when at least one executor has
registered. Otherwise,
// task sets will be (unnecessarily) aborted in cases when no executors
have registered yet.
if (hostToExecutors.nonEmpty) {
@@ -725,7 +734,7 @@ private[spark] class TaskSetManager(
hostToExecutors.forall { case (host, execsOnHost) =>
// Check if the task can run on the node
val nodeExcluded =
- appHealthTracker.isNodeExcluded(host) ||
+ healthTracker.exists(_.isNodeExcluded(host)) ||
taskSetExcludelist.isNodeExcludedForTaskSet(host) ||
taskSetExcludelist.isNodeExcludedForTask(host, indexInTaskSet)
if (nodeExcluded) {
@@ -733,7 +742,7 @@ private[spark] class TaskSetManager(
} else {
// Check if the task can run on any of the executors
execsOnHost.forall { exec =>
- appHealthTracker.isExecutorExcluded(exec) ||
+ healthTracker.exists(_.isExecutorExcluded(exec)) ||
taskSetExcludelist.isExecutorExcludedForTaskSet(exec) ||
taskSetExcludelist.isExecutorExcludedForTask(exec,
indexInTaskSet)
}
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
index e7a57c22ef66..478e578130fc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
@@ -441,6 +441,23 @@ class HealthTrackerSuite extends SparkFunSuite with
MockitoSugar with LocalSpark
assert(1000 === HealthTracker.getExcludeOnFailureTimeout(conf))
}
+ test("SPARK-49252: check exclusion enabling config on the application
level") {
+ val conf = new SparkConf().setMaster("local")
+ assert(!HealthTracker.isExcludeOnFailureEnabled(conf))
+ conf.set(config.EXCLUDE_ON_FAILURE_ENABLED, true)
+ assert(HealthTracker.isExcludeOnFailureEnabled(conf))
+ // Turn off taskset level exclusion, application level healthtracker
should still be enabled.
+ conf.set(config.EXCLUDE_ON_FAILURE_ENABLED_TASK_AND_STAGE, false)
+ assert(HealthTracker.isExcludeOnFailureEnabled(conf))
+ // Turn off the application level exclusion specifically, this overrides
the global setting.
+ conf.set(config.EXCLUDE_ON_FAILURE_ENABLED_APPLICATION, false)
+ conf.set(config.EXCLUDE_ON_FAILURE_ENABLED_TASK_AND_STAGE, false)
+ assert(!HealthTracker.isExcludeOnFailureEnabled(conf))
+ // Turn on application level exclusion, health tracker should be enabled.
+ conf.set(config.EXCLUDE_ON_FAILURE_ENABLED_APPLICATION, true)
+ assert(HealthTracker.isExcludeOnFailureEnabled(conf))
+ }
+
test("check exclude configuration invariants") {
val conf = new
SparkConf().setMaster("yarn").set(config.SUBMIT_DEPLOY_MODE, "cluster")
Seq(
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ab2c00e36846..7607d4d9fe6d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -2725,6 +2725,39 @@ class TaskSetManagerSuite
assert(executorMonitor.isExecutorIdle("exec2"))
}
+ test("SPARK-49252: TaskSetExcludeList can be created without HealthTracker")
{
+ // When the excludeOnFailure.enabled is set to true, the TaskSetManager
should create a
+ // TaskSetExcludelist even if the application level HealthTracker is not
defined.
+ val conf = new
SparkConf().set(config.EXCLUDE_ON_FAILURE_ENABLED_TASK_AND_STAGE, true)
+
+ // Create a task with two executors.
+ sc = new SparkContext("local", "test", conf)
+ sched = new FakeTaskScheduler(sc)
+ val taskSet = FakeTask.createTaskSet(1)
+
+ val taskSetManager = new TaskSetManager(sched, taskSet, 1,
+ // No application level HealthTracker.
+ healthTracker = None)
+ assert(taskSetManager.taskSetExcludelistHelperOpt.isDefined)
+ }
+
+ test("SPARK-49252: TaskSetExcludeList will be running in dry run mode when" +
+ "exludeOnFailure at taskset level is disabled but health tracker is
enabled") {
+ // Disable the excludeOnFailure.enabled at taskset level.
+ val conf = new
SparkConf().set(config.EXCLUDE_ON_FAILURE_ENABLED_TASK_AND_STAGE, false)
+
+ // Create a task with two executors.
+ sc = new SparkContext("local", "test", conf)
+ sched = new FakeTaskScheduler(sc)
+ val taskSet = FakeTask.createTaskSet(1)
+
+ val taskSetManager = new TaskSetManager(sched, taskSet, 1,
+ // Enable the application level HealthTracker.
+ healthTracker = Some(new HealthTracker(sc, None)))
+ assert(taskSetManager.taskSetExcludelistHelperOpt.isDefined)
+ assert(taskSetManager.taskSetExcludelistHelperOpt.get.isDryRun)
+ }
+
}
class FakeLongTasks(stageId: Int, partitionId: Int) extends FakeTask(stageId,
partitionId) {
diff --git a/docs/configuration.md b/docs/configuration.md
index ff2f21d282a5..2da099a6c5ed 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2839,9 +2839,36 @@ Apart from these, the following properties are also
available, and may be useful
If set to "true", prevent Spark from scheduling tasks on executors that
have been excluded
due to too many task failures. The algorithm used to exclude executors and
nodes can be further
controlled by the other "spark.excludeOnFailure" configuration options.
+ This config will be overriden by
"spark.excludeOnFailure.application.enabled" and
+ "spark.excludeOnFailure.taskAndStage.enabled" to specify exclusion
enablement on individual
+ levels.
</td>
<td>2.1.0</td>
</tr>
+<tr>
+ <td><code>spark.excludeOnFailure.application.enabled</code></td>
+ <td>
+ false
+ </td>
+ <td>
+ If set to "true", enables excluding executors for the entire application
due to too many task
+ failures and prevent Spark from scheduling tasks on them.
+ This config overrides "spark.excludeOnFailure.enabled".
+ </td>
+ <td>4.0.0</td>
+</tr>
+<tr>
+ <td><code>spark.excludeOnFailure.taskAndStage.enabled</code></td>
+ <td>
+ false
+ </td>
+ <td>
+ If set to "true", enables excluding executors on a task set level due to
too many task
+ failures and prevent Spark from scheduling tasks on them.
+ This config overrides "spark.excludeOnFailure.enabled".
+ </td>
+ <td>4.0.0</td>
+</tr>
<tr>
<td><code>spark.excludeOnFailure.timeout</code></td>
<td>1h</td>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]