This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0602020eb3b3 [SPARK-49252][CORE] Make`TaskSetExcludeList` and 
`HeathTracker` independent
0602020eb3b3 is described below

commit 0602020eb3b346a8c50ad32eeda4e6dabb70c584
Author: tianhanhu <[email protected]>
AuthorDate: Fri Aug 30 11:05:00 2024 +0800

    [SPARK-49252][CORE] Make`TaskSetExcludeList` and `HeathTracker` independent
    
    <!--
    Thanks for sending a pull request!  Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://spark.apache.org/contributing.html
      2. Ensure you have added or run the appropriate tests for your PR: 
https://spark.apache.org/developer-tools.html
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][SPARK-XXXX] Your PR title ...'.
      4. Be sure to keep the PR description updated to reflect all changes.
      5. Please write your PR title to summarize what this PR proposes.
      6. If possible, provide a concise example to reproduce the issue for a 
faster review.
      7. If you want to add a new configuration, please read the guideline 
first for naming configurations in
         
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
      8. If you want to add or modify an error type or message, please read the 
guideline first in
         'common/utils/src/main/resources/error/README.md'.
    -->
    
    ### What changes were proposed in this pull request?
    <!--
    Please clarify what changes you are proposing. The purpose of this section 
is to outline the changes and how this PR fixes the issue.
    If possible, please consider writing useful notes for better and faster 
reviews in your PR. See the examples below.
      1. If you refactor some codes with changing classes, showing the class 
hierarchy will help reviewers.
      2. If you fix some SQL features, you can provide some references of other 
DBMSes.
      3. If there is design documentation, please add the link.
      4. If there is a discussion in the mailing list, please add the link.
    -->
    Make the change such that `TaskSetExcludeList` and `HeathTracker` can be 
enabled independently.
    
    When application level `HealthTracker` is created, but taskset level 
exclusion is not enabled, `TaskSetExcludeList` would be created in dry run 
mode, where it still records and reports task failure data to `HealthTracker` 
but does not participate in scheduler decision making.
    
    ### Why are the changes needed?
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you propose a new API, clarify the use case for a new API.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    Currently, when `spark.excludeOnFailure.enabled` is set to true, both task 
set level exclusion (`TaskSetExcludeList`) and application level 
(`HealthTracker`) would both be enabled.
    In some cases, we only want to enable exclusion on a single dimension.
    
    ### Does this PR introduce _any_ user-facing change?
    <!--
    Note that it means *any* user-facing change including all aspects such as 
the documentation fix.
    If yes, please clarify the previous behavior and the change this PR 
proposes - provide the console output, description and/or an example to show 
the behavior difference if possible.
    If possible, please also clarify if this is a user-facing change compared 
to the released Spark versions or within the unreleased branches such as master.
    If no, write 'No'.
    -->
    Yes, introduced two new user facing configs 
`spark.excludeOnFailure.application.enabled` and 
`spark.excludeOnFailure.taskAndStage.enabled` that allows setting exclusion for 
taskset/application individually.
    
    ### How was this patch tested?
    <!--
    If tests were added, say they were added here. Please make sure to add some 
test cases that check the changes thoroughly including negative and positive 
cases if possible.
    If it was tested in a way different from regular unit tests, please clarify 
how you tested step by step, ideally copy and paste-able, so that other 
reviewers can test and check, and descendants can verify in the future.
    If tests were not added, please describe why they were not added and/or why 
it was difficult to add.
    If benchmark tests were added, please run the benchmarks in GitHub Actions 
for the consistent environment, and the instructions could accord to: 
https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
    -->
    New unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    <!--
    If generative AI tooling has been used in the process of authoring this 
patch, please include the
    phrase: 'Generated-by: ' followed by the name of the tool and its version.
    If no, write 'No'.
    Please refer to the [ASF Generative Tooling 
Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
    -->
    No
    
    Closes #47793 from tianhanhu/SPARK-49252_separate_exclusion.
    
    Authored-by: tianhanhu <[email protected]>
    Signed-off-by: Yi Wu <[email protected]>
---
 .../org/apache/spark/internal/config/package.scala | 12 ++++++++
 .../org/apache/spark/scheduler/HealthTracker.scala | 14 +++++----
 .../spark/scheduler/TaskSetExcludeList.scala       | 29 +++++++++++++++----
 .../apache/spark/scheduler/TaskSetManager.scala    | 19 +++++++++----
 .../spark/scheduler/HealthTrackerSuite.scala       | 17 +++++++++++
 .../spark/scheduler/TaskSetManagerSuite.scala      | 33 ++++++++++++++++++++++
 docs/configuration.md                              | 27 ++++++++++++++++++
 7 files changed, 135 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index e9e411cc56b5..8224bcac2830 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -918,6 +918,18 @@ package object config {
       .booleanConf
       .createOptional
 
+  private[spark] val EXCLUDE_ON_FAILURE_ENABLED_APPLICATION =
+    ConfigBuilder("spark.excludeOnFailure.application.enabled")
+      .version("4.0.0")
+      .booleanConf
+      .createOptional
+
+  private[spark] val EXCLUDE_ON_FAILURE_ENABLED_TASK_AND_STAGE =
+    ConfigBuilder("spark.excludeOnFailure.taskAndStage.enabled")
+      .version("4.0.0")
+      .booleanConf
+      .createOptional
+
   private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR =
     ConfigBuilder("spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor")
       .version("3.1.0")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala 
b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
index 160607215390..82ec0ef91f4f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
@@ -425,14 +425,16 @@ private[spark] object HealthTracker extends Logging {
   private val DEFAULT_TIMEOUT = "1h"
 
   /**
-   * Returns true if the excludeOnFailure is enabled, based on checking the 
configuration
-   * in the following order:
-   * 1. Is it specifically enabled or disabled?
-   * 2. Is it enabled via the legacy timeout conf?
-   * 3. Default is off
+   * Returns true if the excludeOnFailure is enabled on the application level,
+   * based on checking the configuration in the following order:
+   * 1. Is application level exclusion specifically enabled or disabled?
+   * 2. Is overall exclusion feature enabled or disabled?
+   * 3. Is it enabled via the legacy timeout conf?
+   * 4. Default is off
    */
   def isExcludeOnFailureEnabled(conf: SparkConf): Boolean = {
-    conf.get(config.EXCLUDE_ON_FAILURE_ENABLED) match {
+    conf.get(config.EXCLUDE_ON_FAILURE_ENABLED_APPLICATION)
+      .orElse(conf.get(config.EXCLUDE_ON_FAILURE_ENABLED)) match {
       case Some(enabled) =>
         enabled
       case None =>
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala
index c9aa74e0852b..363730529310 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala
@@ -31,6 +31,9 @@ import org.apache.spark.util.Clock
  * which is handled by [[HealthTracker]].  Note that HealthTracker does not 
know anything
  * about task failures until a taskset completes successfully.
  *
+ * If isDryRun is true, then this class will only function to store 
information for application
+ * level exclusion, and will not actually exclude any tasks in task/stage 
level.
+ *
  * THREADING:  This class is a helper to [[TaskSetManager]]; as with the 
methods in
  * [[TaskSetManager]] this class is designed only to be called from code with 
a lock on the
  * TaskScheduler (e.g. its event handlers). It should not be called from other 
threads.
@@ -40,7 +43,8 @@ private[scheduler] class TaskSetExcludelist(
     val conf: SparkConf,
     val stageId: Int,
     val stageAttemptId: Int,
-    val clock: Clock) extends Logging {
+    val clock: Clock,
+    val isDryRun: Boolean = false) extends Logging {
 
   private val MAX_TASK_ATTEMPTS_PER_EXECUTOR = 
conf.get(config.MAX_TASK_ATTEMPTS_PER_EXECUTOR)
   private val MAX_TASK_ATTEMPTS_PER_NODE = 
conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE)
@@ -80,13 +84,13 @@ private[scheduler] class TaskSetExcludelist(
    * of the scheduler, where those filters will have already been applied.
    */
   def isExecutorExcludedForTask(executorId: String, index: Int): Boolean = {
-    execToFailures.get(executorId).exists { execFailures =>
+    !isDryRun && execToFailures.get(executorId).exists { execFailures =>
       execFailures.getNumTaskFailures(index) >= MAX_TASK_ATTEMPTS_PER_EXECUTOR
     }
   }
 
   def isNodeExcludedForTask(node: String, index: Int): Boolean = {
-    nodeToExcludedTaskIndexes.get(node).exists(_.contains(index))
+    !isDryRun && nodeToExcludedTaskIndexes.get(node).exists(_.contains(index))
   }
 
   /**
@@ -96,11 +100,11 @@ private[scheduler] class TaskSetExcludelist(
    * scheduler, where those filters will already have been applied.
    */
   def isExecutorExcludedForTaskSet(executorId: String): Boolean = {
-    excludedExecs.contains(executorId)
+    !isDryRun && excludedExecs.contains(executorId)
   }
 
   def isNodeExcludedForTaskSet(node: String): Boolean = {
-    excludedNodes.contains(node)
+    !isDryRun && excludedNodes.contains(node)
   }
 
   private[scheduler] def updateExcludedForFailedTask(
@@ -163,3 +167,18 @@ private[scheduler] class TaskSetExcludelist(
     }
   }
 }
+
+private[scheduler] object TaskSetExcludelist {
+
+  /**
+   * Returns true if the excludeOnFailure is enabled on the task/stage level,
+   * based on checking the configuration in the following order:
+   * 1. Is taskset level exclusion specifically enabled or disabled?
+   * 2. Is overall exclusion feature enabled or disabled?
+   * 3. Default is off
+   */
+  def isExcludeOnFailureEnabled(conf: SparkConf): Boolean = {
+    conf.get(config.EXCLUDE_ON_FAILURE_ENABLED_TASK_AND_STAGE)
+      .orElse(conf.get(config.EXCLUDE_ON_FAILURE_ENABLED)).getOrElse(false)
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 7dba4a6dc8fc..a3d074ddd56c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -143,8 +143,18 @@ private[spark] class TaskSetManager(
   private var calculatedTasks = 0
 
   private[scheduler] val taskSetExcludelistHelperOpt: 
Option[TaskSetExcludelist] = {
-    healthTracker.map { _ =>
-      new TaskSetExcludelist(sched.sc.listenerBus, conf, stageId, 
taskSet.stageAttemptId, clock)
+    if (TaskSetExcludelist.isExcludeOnFailureEnabled(conf)) {
+      Some(new TaskSetExcludelist(sched.sc.listenerBus, conf, stageId,
+        taskSet.stageAttemptId, clock))
+    } else if (healthTracker.isDefined) {
+      // If we enabled exclusion at application level but not at taskset level 
exclusion, we create
+      // TaskSetExcludelist in dry run mode.
+      // In this mode, TaskSetExcludeList would not exclude any executors but 
only store
+      // task failure information.
+      Some(new TaskSetExcludelist(sched.sc.listenerBus, conf, stageId,
+        taskSet.stageAttemptId, clock, isDryRun = true))
+    } else {
+      None
     }
   }
 
@@ -698,7 +708,6 @@ private[spark] class TaskSetManager(
   private[scheduler] def getCompletelyExcludedTaskIfAny(
       hostToExecutors: HashMap[String, HashSet[String]]): Option[Int] = {
     taskSetExcludelistHelperOpt.flatMap { taskSetExcludelist =>
-      val appHealthTracker = healthTracker.get
       // Only look for unschedulable tasks when at least one executor has 
registered. Otherwise,
       // task sets will be (unnecessarily) aborted in cases when no executors 
have registered yet.
       if (hostToExecutors.nonEmpty) {
@@ -725,7 +734,7 @@ private[spark] class TaskSetManager(
           hostToExecutors.forall { case (host, execsOnHost) =>
             // Check if the task can run on the node
             val nodeExcluded =
-              appHealthTracker.isNodeExcluded(host) ||
+              healthTracker.exists(_.isNodeExcluded(host)) ||
                 taskSetExcludelist.isNodeExcludedForTaskSet(host) ||
                 taskSetExcludelist.isNodeExcludedForTask(host, indexInTaskSet)
             if (nodeExcluded) {
@@ -733,7 +742,7 @@ private[spark] class TaskSetManager(
             } else {
               // Check if the task can run on any of the executors
               execsOnHost.forall { exec =>
-                appHealthTracker.isExecutorExcluded(exec) ||
+                healthTracker.exists(_.isExecutorExcluded(exec)) ||
                   taskSetExcludelist.isExecutorExcludedForTaskSet(exec) ||
                   taskSetExcludelist.isExecutorExcludedForTask(exec, 
indexInTaskSet)
               }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
index e7a57c22ef66..478e578130fc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
@@ -441,6 +441,23 @@ class HealthTrackerSuite extends SparkFunSuite with 
MockitoSugar with LocalSpark
     assert(1000 === HealthTracker.getExcludeOnFailureTimeout(conf))
   }
 
+  test("SPARK-49252: check exclusion enabling config on the application 
level") {
+    val conf = new SparkConf().setMaster("local")
+    assert(!HealthTracker.isExcludeOnFailureEnabled(conf))
+    conf.set(config.EXCLUDE_ON_FAILURE_ENABLED, true)
+    assert(HealthTracker.isExcludeOnFailureEnabled(conf))
+    // Turn off taskset level exclusion, application level healthtracker 
should still be enabled.
+    conf.set(config.EXCLUDE_ON_FAILURE_ENABLED_TASK_AND_STAGE, false)
+    assert(HealthTracker.isExcludeOnFailureEnabled(conf))
+    // Turn off the application level exclusion specifically, this overrides 
the global setting.
+    conf.set(config.EXCLUDE_ON_FAILURE_ENABLED_APPLICATION, false)
+    conf.set(config.EXCLUDE_ON_FAILURE_ENABLED_TASK_AND_STAGE, false)
+    assert(!HealthTracker.isExcludeOnFailureEnabled(conf))
+    // Turn on application level exclusion, health tracker should be enabled.
+    conf.set(config.EXCLUDE_ON_FAILURE_ENABLED_APPLICATION, true)
+    assert(HealthTracker.isExcludeOnFailureEnabled(conf))
+  }
+
   test("check exclude configuration invariants") {
     val conf = new 
SparkConf().setMaster("yarn").set(config.SUBMIT_DEPLOY_MODE, "cluster")
     Seq(
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ab2c00e36846..7607d4d9fe6d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -2725,6 +2725,39 @@ class TaskSetManagerSuite
     assert(executorMonitor.isExecutorIdle("exec2"))
   }
 
+  test("SPARK-49252: TaskSetExcludeList can be created without HealthTracker") 
{
+    // When the excludeOnFailure.enabled is set to true, the TaskSetManager 
should create a
+    // TaskSetExcludelist even if the application level HealthTracker is not 
defined.
+    val conf = new 
SparkConf().set(config.EXCLUDE_ON_FAILURE_ENABLED_TASK_AND_STAGE, true)
+
+    // Create a task with two executors.
+    sc = new SparkContext("local", "test", conf)
+    sched = new FakeTaskScheduler(sc)
+    val taskSet = FakeTask.createTaskSet(1)
+
+    val taskSetManager = new TaskSetManager(sched, taskSet, 1,
+      // No application level HealthTracker.
+      healthTracker = None)
+    assert(taskSetManager.taskSetExcludelistHelperOpt.isDefined)
+  }
+
+  test("SPARK-49252: TaskSetExcludeList will be running in dry run mode when" +
+    "exludeOnFailure at taskset level is disabled but health tracker is 
enabled") {
+    // Disable the excludeOnFailure.enabled at taskset level.
+    val conf = new 
SparkConf().set(config.EXCLUDE_ON_FAILURE_ENABLED_TASK_AND_STAGE, false)
+
+    // Create a task with two executors.
+    sc = new SparkContext("local", "test", conf)
+    sched = new FakeTaskScheduler(sc)
+    val taskSet = FakeTask.createTaskSet(1)
+
+    val taskSetManager = new TaskSetManager(sched, taskSet, 1,
+      // Enable the application level HealthTracker.
+      healthTracker = Some(new HealthTracker(sc, None)))
+    assert(taskSetManager.taskSetExcludelistHelperOpt.isDefined)
+    assert(taskSetManager.taskSetExcludelistHelperOpt.get.isDryRun)
+  }
+
 }
 
 class FakeLongTasks(stageId: Int, partitionId: Int) extends FakeTask(stageId, 
partitionId) {
diff --git a/docs/configuration.md b/docs/configuration.md
index ff2f21d282a5..2da099a6c5ed 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2839,9 +2839,36 @@ Apart from these, the following properties are also 
available, and may be useful
     If set to "true", prevent Spark from scheduling tasks on executors that 
have been excluded
     due to too many task failures. The algorithm used to exclude executors and 
nodes can be further
     controlled by the other "spark.excludeOnFailure" configuration options.
+    This config will be overriden by 
"spark.excludeOnFailure.application.enabled" and 
+    "spark.excludeOnFailure.taskAndStage.enabled" to specify exclusion 
enablement on individual
+    levels.
   </td>
   <td>2.1.0</td>
 </tr>
+<tr>
+  <td><code>spark.excludeOnFailure.application.enabled</code></td>
+  <td>
+    false
+  </td>
+  <td>
+    If set to "true", enables excluding executors for the entire application 
due to too many task
+    failures and prevent Spark from scheduling tasks on them.
+    This config overrides "spark.excludeOnFailure.enabled". 
+  </td>
+  <td>4.0.0</td>
+</tr>
+<tr>
+  <td><code>spark.excludeOnFailure.taskAndStage.enabled</code></td>
+  <td>
+    false
+  </td>
+  <td>
+    If set to "true", enables excluding executors on a task set level due to 
too many task
+    failures and prevent Spark from scheduling tasks on them.
+    This config overrides "spark.excludeOnFailure.enabled". 
+  </td>
+  <td>4.0.0</td>
+</tr>
 <tr>
   <td><code>spark.excludeOnFailure.timeout</code></td>
   <td>1h</td>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to