This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c21238ce0e [SPARK-46021][CORE] Support cancel future jobs belonging 
to a job group
9c21238ce0e is described below

commit 9c21238ce0e6bec676213f4bad4bc1bf4ac16932
Author: Xinyi Yu <[email protected]>
AuthorDate: Thu Nov 23 20:29:45 2023 +0800

    [SPARK-46021][CORE] Support cancel future jobs belonging to a job group
    
    ### What changes were proposed in this pull request?
    This PR supports a new API in SparkContext 
`cancelJobGroupAndFutureJobs(jobGroup)`. It not only cancels the active jobs, 
future submitted jobs that belongs to this job group will be cancelled and not 
run.
    
    Internally, it uses a limited-size (current size: 1000, controlled by 
config `CANCELLED_JOB_GROUP_SET_SIZE`) FIFO set to record all the job group 
cancelled with this new API.
    
    This PR also adds a new error class `SPARK_JOB_CANCELLED` without changing 
the error message at all, based on the assumption that it's a fundamental error 
that some downstream workload could rely on parsing the error message.
    
    ### Why are the changes needed?
    Improvements.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    New tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #43926 from anchovYu/SPARK-46021.
    
    Authored-by: Xinyi Yu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 common/utils/src/main/resources/error/README.md    |  2 +-
 .../src/main/resources/error/error-classes.json    |  6 ++
 .../main/scala/org/apache/spark/SparkContext.scala | 11 +++
 .../org/apache/spark/errors/SparkCoreErrors.scala  | 12 ++++
 .../org/apache/spark/internal/config/package.scala | 11 +++
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 69 +++++++++++++++----
 .../apache/spark/scheduler/DAGSchedulerEvent.scala |  5 +-
 .../org/apache/spark/JobCancellationSuite.scala    | 78 ++++++++++++++++++++++
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 14 +++-
 docs/sql-error-conditions.md                       |  6 ++
 10 files changed, 197 insertions(+), 17 deletions(-)

diff --git a/common/utils/src/main/resources/error/README.md 
b/common/utils/src/main/resources/error/README.md
index 7b1b9038aeb..c9fdd84e744 100644
--- a/common/utils/src/main/resources/error/README.md
+++ b/common/utils/src/main/resources/error/README.md
@@ -1347,7 +1347,7 @@ The following SQLSTATEs are collated from:
 |XX001    |XX   |Internal Error                                    |001     
|data_corrupted                                              |PostgreSQL     |N 
      |PostgreSQL Redshift                                                      
   |
 |XX002    |XX   |Internal Error                                    |002     
|index_corrupted                                             |PostgreSQL     |N 
      |PostgreSQL Redshift                                                      
   |
 |XXKD0    |XX   |Internal Error                                    |KD0     
|Analysis - Bad plan                                         |Databricks     |N 
      |Databricks                                                               
   |
-|XXKDA    |XX   |Internal Error                                    |KAS     
|Scheduler (Aether Scheduler)                                |Databricks     |N 
      |Databricks                                                               
   |
+|XXKDA    |XX   |Internal Error                                    |KAS     
|Scheduler                                                   |Databricks     |N 
      |Databricks                                                               
   |
 |XXKDS    |XX   |Internal Error                                    |KDS     
|Delta Storage                                               |Databricks     |N 
      |Databricks                                                               
   |
 |XXKUC    |XX   |Internal Error                                    |KUC     
|Catalog Service (Unity Catalog)                             |Databricks     |N 
      |Databricks                                                               
   |
 |XXKST    |XX   |Internal Error                                    |KST     
|Streaming                                                   |Databricks     |N 
      |Databricks                                                               
   |
diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index afcd841a2ce..8b1a3a33f7d 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2961,6 +2961,12 @@
     ],
     "sqlState" : "42601"
   },
+  "SPARK_JOB_CANCELLED" : {
+    "message" : [
+      "Job <jobId> cancelled <reason>"
+    ],
+    "sqlState" : "XXKDA"
+  },
   "SPECIFY_BUCKETING_IS_NOT_ALLOWED" : {
     "message" : [
       "A CREATE TABLE without explicit column list cannot specify bucketing 
information.",
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ed00baa01d6..b43bb08d25a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2608,6 +2608,17 @@ class SparkContext(config: SparkConf) extends Logging {
     dagScheduler.cancelJobGroup(groupId)
   }
 
+  /**
+   * Cancel active jobs for the specified group, as well as the future jobs in 
this job group.
+   * Note: the maximum number of job groups that can be tracked is set by
+   * 'spark.scheduler.numCancelledJobGroupsToTrack'. Once the limit is reached 
and a new job group
+   * is to be added, the oldest job group tracked will be discarded.
+   */
+  def cancelJobGroupAndFutureJobs(groupId: String): Unit = {
+    assertNotStopped()
+    dagScheduler.cancelJobGroup(groupId, cancelFutureJobs = true)
+  }
+
   /**
    * Cancel active jobs that have the specified tag. See 
`org.apache.spark.SparkContext.addJobTag`.
    *
diff --git a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala 
b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
index fabb78e82cf..4013b5a5035 100644
--- a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
+++ b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
@@ -221,6 +221,18 @@ private[spark] object SparkCoreErrors {
     new NoSuchElementException(id)
   }
 
+  def sparkJobCancelled(jobId: Int, reason: String, e: Exception): 
SparkException = {
+    new SparkException(
+      errorClass = "SPARK_JOB_CANCELLED",
+      messageParameters = Map("jobId" -> jobId.toString, "reason" -> reason),
+      cause = e
+    )
+  }
+
+  def sparkJobCancelledAsPartOfJobGroupError(jobId: Int, jobGroupId: String): 
SparkException = {
+    sparkJobCancelled(jobId, s"part of cancelled job group $jobGroupId", null)
+  }
+
   def barrierStageWithRDDChainPatternError(): Throwable = {
     new BarrierJobUnsupportedRDDChainException
   }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index b2bf30863a9..e7e69dcb01e 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1728,6 +1728,17 @@ package object config {
       .checkValue(v => v > 0, "The max failures should be a positive value.")
       .createWithDefault(40)
 
+  private[spark] val NUM_CANCELLED_JOB_GROUPS_TO_TRACK =
+    ConfigBuilder("spark.scheduler.numCancelledJobGroupsToTrack")
+      .doc("The maximum number of tracked job groups that are cancelled with " 
+
+        "`cancelJobGroupAndFutureJobs`. If this maximum number is hit, the 
oldest job group " +
+        "will no longer be tracked that future jobs belonging to this job 
group will not " +
+        "be cancelled.")
+      .version("4.0.0")
+      .intConf
+      .checkValue(v => v > 0, "The size of the set should be a positive 
value.")
+      .createWithDefault(1000)
+
   private[spark] val UNSAFE_EXCEPTION_ON_MEMORY_LEAK =
     ConfigBuilder("spark.unsafe.exceptionOnMemoryLeak")
       .internal()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 045f67ffd80..241ef35cad7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -169,6 +169,12 @@ private[spark] class DAGScheduler(
 
   private[scheduler] val activeJobs = new HashSet[ActiveJob]
 
+  // Job groups that are cancelled with `cancelFutureJobs` as true, with at 
most
+  // `NUM_CANCELLED_JOB_GROUPS_TO_TRACK` stored. On a new job submission, if 
its job group is in
+  // this set, the job will be immediately cancelled.
+  private[scheduler] val cancelledJobGroups =
+    new 
LimitedSizeFIFOSet[String](sc.getConf.get(config.NUM_CANCELLED_JOB_GROUPS_TO_TRACK))
+
   /**
    * Contains the locations that each RDD's partitions are cached on.  This 
map's keys are RDD ids
    * and its values are arrays indexed by partition numbers. Each array value 
is the set of
@@ -1081,10 +1087,11 @@ private[spark] class DAGScheduler(
 
   /**
    * Cancel all jobs in the given job group ID.
+   * @param cancelFutureJobs if true, future submitted jobs in this job group 
will be cancelled
    */
-  def cancelJobGroup(groupId: String): Unit = {
-    logInfo("Asked to cancel job group " + groupId)
-    eventProcessLoop.post(JobGroupCancelled(groupId))
+  def cancelJobGroup(groupId: String, cancelFutureJobs: Boolean = false): Unit 
= {
+    logInfo(s"Asked to cancel job group $groupId with 
cancelFutureJobs=$cancelFutureJobs")
+    eventProcessLoop.post(JobGroupCancelled(groupId, cancelFutureJobs))
   }
 
   /**
@@ -1180,7 +1187,16 @@ private[spark] class DAGScheduler(
     jobsThatUseStage.find(jobIdToActiveJob.contains)
   }
 
-  private[scheduler] def handleJobGroupCancelled(groupId: String): Unit = {
+  private[scheduler] def handleJobGroupCancelled(
+      groupId: String,
+      cancelFutureJobs: Boolean): Unit = {
+    // If cancelFutureJobs is true, store the cancelled job group id into 
internal states.
+    // When a job belonging to this job group is submitted, skip running it.
+    if (cancelFutureJobs) {
+      logInfo(s"Add job group $groupId into cancelled job groups")
+      cancelledJobGroups.add(groupId)
+    }
+
     // Cancel all jobs belonging to this job group.
     // First finds all active jobs with this group id, and then kill stages 
for them.
     val activeInGroup = activeJobs.filter { activeJob =>
@@ -1264,7 +1280,8 @@ private[spark] class DAGScheduler(
     listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
   }
 
-  private[scheduler] def handleJobSubmitted(jobId: Int,
+  private[scheduler] def handleJobSubmitted(
+      jobId: Int,
       finalRDD: RDD[_],
       func: (TaskContext, Iterator[_]) => _,
       partitions: Array[Int],
@@ -1272,6 +1289,15 @@ private[spark] class DAGScheduler(
       listener: JobListener,
       artifacts: JobArtifactSet,
       properties: Properties): Unit = {
+    // If this job belongs to a cancelled job group, skip running it
+    val jobGroupIdOpt = 
Option(properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
+    if (jobGroupIdOpt.exists(cancelledJobGroups.contains(_))) {
+      listener.jobFailed(
+        SparkCoreErrors.sparkJobCancelledAsPartOfJobGroupError(jobId, 
jobGroupIdOpt.get))
+      logInfo(s"Skip running a job that belongs to the cancelled job group 
${jobGroupIdOpt.get}.")
+      return
+    }
+
     var finalStage: ResultStage = null
     try {
       // New stage creation may throw an exception if, for example, jobs are 
run on a
@@ -2727,7 +2753,9 @@ private[spark] class DAGScheduler(
       logDebug("Trying to cancel unregistered job " + jobId)
     } else {
       failJobAndIndependentStages(
-        jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, 
reason.getOrElse("")))
+        job = jobIdToActiveJob(jobId),
+        error = SparkCoreErrors.sparkJobCancelled(jobId, reason.getOrElse(""), 
null)
+      )
     }
   }
 
@@ -2787,7 +2815,10 @@ private[spark] class DAGScheduler(
     failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())
     updateStageInfoForPushBasedShuffle(failedStage)
     for (job <- dependentJobs) {
-      failJobAndIndependentStages(job, s"Job aborted due to stage failure: 
$reason", exception)
+      failJobAndIndependentStages(
+        job,
+        new SparkException(s"Job aborted due to stage failure: $reason", cause 
= exception.orNull)
+      )
     }
     if (dependentJobs.isEmpty) {
       logInfo("Ignoring failure of " + failedStage + " because all jobs 
depending on it are done")
@@ -2845,13 +2876,11 @@ private[spark] class DAGScheduler(
   /** Fails a job and all stages that are only used by that job, and cleans up 
relevant state. */
   private def failJobAndIndependentStages(
       job: ActiveJob,
-      failureReason: String,
-      exception: Option[Throwable] = None): Unit = {
-    if (cancelRunningIndependentStages(job, failureReason)) {
+      error: SparkException): Unit = {
+    if (cancelRunningIndependentStages(job, error.getMessage)) {
       // SPARK-15783 important to cleanup state first, just for tests where we 
have some asserts
       // against the state.  Otherwise we have a *little* bit of flakiness in 
the tests.
       cleanupStateForJobAndIndependentStages(job)
-      val error = new SparkException(failureReason, exception.orNull)
       job.listener.jobFailed(error)
       listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), 
JobFailed(error)))
     }
@@ -3010,8 +3039,8 @@ private[scheduler] class 
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
     case JobCancelled(jobId, reason) =>
       dagScheduler.handleJobCancellation(jobId, reason)
 
-    case JobGroupCancelled(groupId) =>
-      dagScheduler.handleJobGroupCancelled(groupId)
+    case JobGroupCancelled(groupId, cancelFutureJobs) =>
+      dagScheduler.handleJobGroupCancelled(groupId, cancelFutureJobs)
 
     case JobTagCancelled(tag) =>
       dagScheduler.handleJobTagCancelled(tag)
@@ -3092,3 +3121,17 @@ private[spark] object DAGScheduler {
   // as more failure events come in
   val RESUBMIT_TIMEOUT = 200
 }
+
+/**
+ * A NOT thread-safe set that only keeps the last `capacity` elements added to 
it.
+ */
+private[scheduler] class LimitedSizeFIFOSet[T](val capacity: Int) {
+  private val set = scala.collection.mutable.LinkedHashSet[T]()
+  def add(t: T): Unit = {
+    set += t
+    if (set.size > capacity) {
+      set -= set.head
+    }
+  }
+  def contains(t: T): Boolean = set.contains(t)
+}
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index f8cd2742906..fb04695deab 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -63,7 +63,10 @@ private[scheduler] case class JobCancelled(
     reason: Option[String])
   extends DAGSchedulerEvent
 
-private[scheduler] case class JobGroupCancelled(groupId: String) extends 
DAGSchedulerEvent
+private[scheduler] case class JobGroupCancelled(
+    groupId: String,
+    cancelFutureJobs: Boolean = false)
+  extends DAGSchedulerEvent
 
 private[scheduler] case class JobTagCancelled(tagName: String) extends 
DAGSchedulerEvent
 
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala 
b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 23225b2957a..997fda93bc9 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -153,6 +153,84 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
     assert(jobB.get() === 100)
   }
 
+  test("if cancel job group and future jobs, skip running jobs in the same job 
group") {
+    sc = new SparkContext("local[2]", "test")
+
+    val sem = new Semaphore(0)
+    sc.addSparkListener(new SparkListener {
+      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+        sem.release()
+      }
+    })
+
+    // run a job, cancel the job group and its future jobs
+    val jobGroupName = "job-group"
+    val job = Future {
+      sc.setJobGroup(jobGroupName, "")
+      sc.parallelize(1 to 1000).map { i => Thread.sleep (100); i}.count()
+    }
+    // block until job starts
+    sem.acquire(1)
+    // cancel the job group and future jobs
+    sc.cancelJobGroupAndFutureJobs(jobGroupName)
+    ThreadUtils.awaitReady(job, Duration.Inf).failed.foreach { case e: 
SparkException =>
+      checkError(
+        exception = e,
+        errorClass = "SPARK_JOB_CANCELLED",
+        sqlState = "XXKDA",
+        parameters = scala.collection.immutable.Map(
+          "jobId" -> "0",
+          "reason" -> s"part of cancelled job group $jobGroupName")
+      )
+    }
+
+    // job in the same job group will not run
+    checkError(
+      exception = intercept[SparkException] {
+        sc.setJobGroup(jobGroupName, "")
+        sc.parallelize(1 to 100).count()
+      },
+      errorClass = "SPARK_JOB_CANCELLED",
+      sqlState = "XXKDA",
+      parameters = scala.collection.immutable.Map(
+        "jobId" -> "1",
+        "reason" -> s"part of cancelled job group $jobGroupName")
+    )
+
+    // job in a different job group should run
+    sc.setJobGroup("another-job-group", "")
+    assert(sc.parallelize(1 to 100).count() == 100)
+  }
+
+  test("only keeps limited number of cancelled job groups") {
+    val conf = new SparkConf()
+      .set(NUM_CANCELLED_JOB_GROUPS_TO_TRACK, 5)
+    sc = new SparkContext("local[2]", "test", conf)
+    val setSize = sc.getConf.get(NUM_CANCELLED_JOB_GROUPS_TO_TRACK)
+    // call cancelJobGroup with cancelFutureJobs = true on (setSize + 1) job 
groups, the first one
+    // should have been evicted from the cancelledJobGroups set
+    (0 to setSize).foreach { idx =>
+      val sem = new Semaphore(0)
+      sc.addSparkListener(new SparkListener {
+        override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+          sem.release()
+        }
+      })
+      val job = Future {
+        sc.setJobGroup(s"job-group-$idx", "")
+        sc.parallelize(1 to 1000).map { i => Thread.sleep (100); i}.count()
+      }
+      sem.acquire(1)
+      sc.cancelJobGroupAndFutureJobs(s"job-group-$idx")
+      ThreadUtils.awaitReady(job, Duration.Inf).failed.foreach { case e: 
SparkException =>
+        assert(e.getErrorClass == "SPARK_JOB_CANCELLED")
+      }
+    }
+    // submit a job with the 0 job group that was evicted from 
cancelledJobGroups set, it should run
+    sc.setJobGroup("job-group-0", "")
+    assert(sc.parallelize(1 to 100).count() == 100)
+  }
+
   test("job tags") {
     sc = new SparkContext("local[2]", "test")
 
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 0f596d7d5b7..0f7146bc7c1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -724,7 +724,12 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     assert(numResults === 0)
     cancel(jobId)
     assert(failureReason.isDefined)
-    assert(failureReason.get.getMessage() === "Job 0 cancelled ")
+    checkError(
+      exception = failureReason.get.asInstanceOf[SparkException],
+      errorClass = "SPARK_JOB_CANCELLED",
+      sqlState = "XXKDA",
+      parameters = scala.collection.immutable.Map("jobId" -> "0", "reason" -> 
"")
+    )
   }
 
   test("run trivial job") {
@@ -841,7 +846,12 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     val rdd = new MyRDD(sc, 1, Nil)
     val jobId = submit(rdd, Array(0))
     cancel(jobId)
-    assert(failure.getMessage === s"Job $jobId cancelled ")
+    checkError(
+      exception = failure.asInstanceOf[SparkException],
+      errorClass = "SPARK_JOB_CANCELLED",
+      sqlState = "XXKDA",
+      parameters = scala.collection.immutable.Map("jobId" -> jobId.toString, 
"reason" -> "")
+    )
     assert(sparkListener.failedStages === Seq(0))
     assertDataStructuresEmpty()
   }
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index cba6a24b869..3e706debbf8 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1850,6 +1850,12 @@ The seed expression `<seedExpr>` of the expression 
`<exprWithSeed>` must be fold
 
 sortBy must be used together with bucketBy.
 
+### SPARK_JOB_CANCELLED
+
+[SQLSTATE: XXKDA](sql-error-conditions-sqlstates.html#class-XX-internal-error)
+
+Job `<jobId>` cancelled `<reason>`
+
 ### SPECIFY_BUCKETING_IS_NOT_ALLOWED
 
 [SQLSTATE: 
42601](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to