This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9c21238ce0e [SPARK-46021][CORE] Support cancel future jobs belonging
to a job group
9c21238ce0e is described below
commit 9c21238ce0e6bec676213f4bad4bc1bf4ac16932
Author: Xinyi Yu <[email protected]>
AuthorDate: Thu Nov 23 20:29:45 2023 +0800
[SPARK-46021][CORE] Support cancel future jobs belonging to a job group
### What changes were proposed in this pull request?
This PR supports a new API in SparkContext
`cancelJobGroupAndFutureJobs(jobGroup)`. It not only cancels the active jobs,
future submitted jobs that belongs to this job group will be cancelled and not
run.
Internally, it uses a limited-size (current size: 1000, controlled by
config `CANCELLED_JOB_GROUP_SET_SIZE`) FIFO set to record all the job group
cancelled with this new API.
This PR also adds a new error class `SPARK_JOB_CANCELLED` without changing
the error message at all, based on the assumption that it's a fundamental error
that some downstream workload could rely on parsing the error message.
### Why are the changes needed?
Improvements.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43926 from anchovYu/SPARK-46021.
Authored-by: Xinyi Yu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
common/utils/src/main/resources/error/README.md | 2 +-
.../src/main/resources/error/error-classes.json | 6 ++
.../main/scala/org/apache/spark/SparkContext.scala | 11 +++
.../org/apache/spark/errors/SparkCoreErrors.scala | 12 ++++
.../org/apache/spark/internal/config/package.scala | 11 +++
.../org/apache/spark/scheduler/DAGScheduler.scala | 69 +++++++++++++++----
.../apache/spark/scheduler/DAGSchedulerEvent.scala | 5 +-
.../org/apache/spark/JobCancellationSuite.scala | 78 ++++++++++++++++++++++
.../apache/spark/scheduler/DAGSchedulerSuite.scala | 14 +++-
docs/sql-error-conditions.md | 6 ++
10 files changed, 197 insertions(+), 17 deletions(-)
diff --git a/common/utils/src/main/resources/error/README.md
b/common/utils/src/main/resources/error/README.md
index 7b1b9038aeb..c9fdd84e744 100644
--- a/common/utils/src/main/resources/error/README.md
+++ b/common/utils/src/main/resources/error/README.md
@@ -1347,7 +1347,7 @@ The following SQLSTATEs are collated from:
|XX001 |XX |Internal Error |001
|data_corrupted |PostgreSQL |N
|PostgreSQL Redshift
|
|XX002 |XX |Internal Error |002
|index_corrupted |PostgreSQL |N
|PostgreSQL Redshift
|
|XXKD0 |XX |Internal Error |KD0
|Analysis - Bad plan |Databricks |N
|Databricks
|
-|XXKDA |XX |Internal Error |KAS
|Scheduler (Aether Scheduler) |Databricks |N
|Databricks
|
+|XXKDA |XX |Internal Error |KAS
|Scheduler |Databricks |N
|Databricks
|
|XXKDS |XX |Internal Error |KDS
|Delta Storage |Databricks |N
|Databricks
|
|XXKUC |XX |Internal Error |KUC
|Catalog Service (Unity Catalog) |Databricks |N
|Databricks
|
|XXKST |XX |Internal Error |KST
|Streaming |Databricks |N
|Databricks
|
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index afcd841a2ce..8b1a3a33f7d 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2961,6 +2961,12 @@
],
"sqlState" : "42601"
},
+ "SPARK_JOB_CANCELLED" : {
+ "message" : [
+ "Job <jobId> cancelled <reason>"
+ ],
+ "sqlState" : "XXKDA"
+ },
"SPECIFY_BUCKETING_IS_NOT_ALLOWED" : {
"message" : [
"A CREATE TABLE without explicit column list cannot specify bucketing
information.",
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ed00baa01d6..b43bb08d25a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2608,6 +2608,17 @@ class SparkContext(config: SparkConf) extends Logging {
dagScheduler.cancelJobGroup(groupId)
}
+ /**
+ * Cancel active jobs for the specified group, as well as the future jobs in
this job group.
+ * Note: the maximum number of job groups that can be tracked is set by
+ * 'spark.scheduler.numCancelledJobGroupsToTrack'. Once the limit is reached
and a new job group
+ * is to be added, the oldest job group tracked will be discarded.
+ */
+ def cancelJobGroupAndFutureJobs(groupId: String): Unit = {
+ assertNotStopped()
+ dagScheduler.cancelJobGroup(groupId, cancelFutureJobs = true)
+ }
+
/**
* Cancel active jobs that have the specified tag. See
`org.apache.spark.SparkContext.addJobTag`.
*
diff --git a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
index fabb78e82cf..4013b5a5035 100644
--- a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
+++ b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
@@ -221,6 +221,18 @@ private[spark] object SparkCoreErrors {
new NoSuchElementException(id)
}
+ def sparkJobCancelled(jobId: Int, reason: String, e: Exception):
SparkException = {
+ new SparkException(
+ errorClass = "SPARK_JOB_CANCELLED",
+ messageParameters = Map("jobId" -> jobId.toString, "reason" -> reason),
+ cause = e
+ )
+ }
+
+ def sparkJobCancelledAsPartOfJobGroupError(jobId: Int, jobGroupId: String):
SparkException = {
+ sparkJobCancelled(jobId, s"part of cancelled job group $jobGroupId", null)
+ }
+
def barrierStageWithRDDChainPatternError(): Throwable = {
new BarrierJobUnsupportedRDDChainException
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index b2bf30863a9..e7e69dcb01e 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1728,6 +1728,17 @@ package object config {
.checkValue(v => v > 0, "The max failures should be a positive value.")
.createWithDefault(40)
+ private[spark] val NUM_CANCELLED_JOB_GROUPS_TO_TRACK =
+ ConfigBuilder("spark.scheduler.numCancelledJobGroupsToTrack")
+ .doc("The maximum number of tracked job groups that are cancelled with "
+
+ "`cancelJobGroupAndFutureJobs`. If this maximum number is hit, the
oldest job group " +
+ "will no longer be tracked that future jobs belonging to this job
group will not " +
+ "be cancelled.")
+ .version("4.0.0")
+ .intConf
+ .checkValue(v => v > 0, "The size of the set should be a positive
value.")
+ .createWithDefault(1000)
+
private[spark] val UNSAFE_EXCEPTION_ON_MEMORY_LEAK =
ConfigBuilder("spark.unsafe.exceptionOnMemoryLeak")
.internal()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 045f67ffd80..241ef35cad7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -169,6 +169,12 @@ private[spark] class DAGScheduler(
private[scheduler] val activeJobs = new HashSet[ActiveJob]
+ // Job groups that are cancelled with `cancelFutureJobs` as true, with at
most
+ // `NUM_CANCELLED_JOB_GROUPS_TO_TRACK` stored. On a new job submission, if
its job group is in
+ // this set, the job will be immediately cancelled.
+ private[scheduler] val cancelledJobGroups =
+ new
LimitedSizeFIFOSet[String](sc.getConf.get(config.NUM_CANCELLED_JOB_GROUPS_TO_TRACK))
+
/**
* Contains the locations that each RDD's partitions are cached on. This
map's keys are RDD ids
* and its values are arrays indexed by partition numbers. Each array value
is the set of
@@ -1081,10 +1087,11 @@ private[spark] class DAGScheduler(
/**
* Cancel all jobs in the given job group ID.
+ * @param cancelFutureJobs if true, future submitted jobs in this job group
will be cancelled
*/
- def cancelJobGroup(groupId: String): Unit = {
- logInfo("Asked to cancel job group " + groupId)
- eventProcessLoop.post(JobGroupCancelled(groupId))
+ def cancelJobGroup(groupId: String, cancelFutureJobs: Boolean = false): Unit
= {
+ logInfo(s"Asked to cancel job group $groupId with
cancelFutureJobs=$cancelFutureJobs")
+ eventProcessLoop.post(JobGroupCancelled(groupId, cancelFutureJobs))
}
/**
@@ -1180,7 +1187,16 @@ private[spark] class DAGScheduler(
jobsThatUseStage.find(jobIdToActiveJob.contains)
}
- private[scheduler] def handleJobGroupCancelled(groupId: String): Unit = {
+ private[scheduler] def handleJobGroupCancelled(
+ groupId: String,
+ cancelFutureJobs: Boolean): Unit = {
+ // If cancelFutureJobs is true, store the cancelled job group id into
internal states.
+ // When a job belonging to this job group is submitted, skip running it.
+ if (cancelFutureJobs) {
+ logInfo(s"Add job group $groupId into cancelled job groups")
+ cancelledJobGroups.add(groupId)
+ }
+
// Cancel all jobs belonging to this job group.
// First finds all active jobs with this group id, and then kill stages
for them.
val activeInGroup = activeJobs.filter { activeJob =>
@@ -1264,7 +1280,8 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
}
- private[scheduler] def handleJobSubmitted(jobId: Int,
+ private[scheduler] def handleJobSubmitted(
+ jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
@@ -1272,6 +1289,15 @@ private[spark] class DAGScheduler(
listener: JobListener,
artifacts: JobArtifactSet,
properties: Properties): Unit = {
+ // If this job belongs to a cancelled job group, skip running it
+ val jobGroupIdOpt =
Option(properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
+ if (jobGroupIdOpt.exists(cancelledJobGroups.contains(_))) {
+ listener.jobFailed(
+ SparkCoreErrors.sparkJobCancelledAsPartOfJobGroupError(jobId,
jobGroupIdOpt.get))
+ logInfo(s"Skip running a job that belongs to the cancelled job group
${jobGroupIdOpt.get}.")
+ return
+ }
+
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are
run on a
@@ -2727,7 +2753,9 @@ private[spark] class DAGScheduler(
logDebug("Trying to cancel unregistered job " + jobId)
} else {
failJobAndIndependentStages(
- jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId,
reason.getOrElse("")))
+ job = jobIdToActiveJob(jobId),
+ error = SparkCoreErrors.sparkJobCancelled(jobId, reason.getOrElse(""),
null)
+ )
}
}
@@ -2787,7 +2815,10 @@ private[spark] class DAGScheduler(
failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())
updateStageInfoForPushBasedShuffle(failedStage)
for (job <- dependentJobs) {
- failJobAndIndependentStages(job, s"Job aborted due to stage failure:
$reason", exception)
+ failJobAndIndependentStages(
+ job,
+ new SparkException(s"Job aborted due to stage failure: $reason", cause
= exception.orNull)
+ )
}
if (dependentJobs.isEmpty) {
logInfo("Ignoring failure of " + failedStage + " because all jobs
depending on it are done")
@@ -2845,13 +2876,11 @@ private[spark] class DAGScheduler(
/** Fails a job and all stages that are only used by that job, and cleans up
relevant state. */
private def failJobAndIndependentStages(
job: ActiveJob,
- failureReason: String,
- exception: Option[Throwable] = None): Unit = {
- if (cancelRunningIndependentStages(job, failureReason)) {
+ error: SparkException): Unit = {
+ if (cancelRunningIndependentStages(job, error.getMessage)) {
// SPARK-15783 important to cleanup state first, just for tests where we
have some asserts
// against the state. Otherwise we have a *little* bit of flakiness in
the tests.
cleanupStateForJobAndIndependentStages(job)
- val error = new SparkException(failureReason, exception.orNull)
job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(),
JobFailed(error)))
}
@@ -3010,8 +3039,8 @@ private[scheduler] class
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)
- case JobGroupCancelled(groupId) =>
- dagScheduler.handleJobGroupCancelled(groupId)
+ case JobGroupCancelled(groupId, cancelFutureJobs) =>
+ dagScheduler.handleJobGroupCancelled(groupId, cancelFutureJobs)
case JobTagCancelled(tag) =>
dagScheduler.handleJobTagCancelled(tag)
@@ -3092,3 +3121,17 @@ private[spark] object DAGScheduler {
// as more failure events come in
val RESUBMIT_TIMEOUT = 200
}
+
+/**
+ * A NOT thread-safe set that only keeps the last `capacity` elements added to
it.
+ */
+private[scheduler] class LimitedSizeFIFOSet[T](val capacity: Int) {
+ private val set = scala.collection.mutable.LinkedHashSet[T]()
+ def add(t: T): Unit = {
+ set += t
+ if (set.size > capacity) {
+ set -= set.head
+ }
+ }
+ def contains(t: T): Boolean = set.contains(t)
+}
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index f8cd2742906..fb04695deab 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -63,7 +63,10 @@ private[scheduler] case class JobCancelled(
reason: Option[String])
extends DAGSchedulerEvent
-private[scheduler] case class JobGroupCancelled(groupId: String) extends
DAGSchedulerEvent
+private[scheduler] case class JobGroupCancelled(
+ groupId: String,
+ cancelFutureJobs: Boolean = false)
+ extends DAGSchedulerEvent
private[scheduler] case class JobTagCancelled(tagName: String) extends
DAGSchedulerEvent
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 23225b2957a..997fda93bc9 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -153,6 +153,84 @@ class JobCancellationSuite extends SparkFunSuite with
Matchers with BeforeAndAft
assert(jobB.get() === 100)
}
+ test("if cancel job group and future jobs, skip running jobs in the same job
group") {
+ sc = new SparkContext("local[2]", "test")
+
+ val sem = new Semaphore(0)
+ sc.addSparkListener(new SparkListener {
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ sem.release()
+ }
+ })
+
+ // run a job, cancel the job group and its future jobs
+ val jobGroupName = "job-group"
+ val job = Future {
+ sc.setJobGroup(jobGroupName, "")
+ sc.parallelize(1 to 1000).map { i => Thread.sleep (100); i}.count()
+ }
+ // block until job starts
+ sem.acquire(1)
+ // cancel the job group and future jobs
+ sc.cancelJobGroupAndFutureJobs(jobGroupName)
+ ThreadUtils.awaitReady(job, Duration.Inf).failed.foreach { case e:
SparkException =>
+ checkError(
+ exception = e,
+ errorClass = "SPARK_JOB_CANCELLED",
+ sqlState = "XXKDA",
+ parameters = scala.collection.immutable.Map(
+ "jobId" -> "0",
+ "reason" -> s"part of cancelled job group $jobGroupName")
+ )
+ }
+
+ // job in the same job group will not run
+ checkError(
+ exception = intercept[SparkException] {
+ sc.setJobGroup(jobGroupName, "")
+ sc.parallelize(1 to 100).count()
+ },
+ errorClass = "SPARK_JOB_CANCELLED",
+ sqlState = "XXKDA",
+ parameters = scala.collection.immutable.Map(
+ "jobId" -> "1",
+ "reason" -> s"part of cancelled job group $jobGroupName")
+ )
+
+ // job in a different job group should run
+ sc.setJobGroup("another-job-group", "")
+ assert(sc.parallelize(1 to 100).count() == 100)
+ }
+
+ test("only keeps limited number of cancelled job groups") {
+ val conf = new SparkConf()
+ .set(NUM_CANCELLED_JOB_GROUPS_TO_TRACK, 5)
+ sc = new SparkContext("local[2]", "test", conf)
+ val setSize = sc.getConf.get(NUM_CANCELLED_JOB_GROUPS_TO_TRACK)
+ // call cancelJobGroup with cancelFutureJobs = true on (setSize + 1) job
groups, the first one
+ // should have been evicted from the cancelledJobGroups set
+ (0 to setSize).foreach { idx =>
+ val sem = new Semaphore(0)
+ sc.addSparkListener(new SparkListener {
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ sem.release()
+ }
+ })
+ val job = Future {
+ sc.setJobGroup(s"job-group-$idx", "")
+ sc.parallelize(1 to 1000).map { i => Thread.sleep (100); i}.count()
+ }
+ sem.acquire(1)
+ sc.cancelJobGroupAndFutureJobs(s"job-group-$idx")
+ ThreadUtils.awaitReady(job, Duration.Inf).failed.foreach { case e:
SparkException =>
+ assert(e.getErrorClass == "SPARK_JOB_CANCELLED")
+ }
+ }
+ // submit a job with the 0 job group that was evicted from
cancelledJobGroups set, it should run
+ sc.setJobGroup("job-group-0", "")
+ assert(sc.parallelize(1 to 100).count() == 100)
+ }
+
test("job tags") {
sc = new SparkContext("local[2]", "test")
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 0f596d7d5b7..0f7146bc7c1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -724,7 +724,12 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
assert(numResults === 0)
cancel(jobId)
assert(failureReason.isDefined)
- assert(failureReason.get.getMessage() === "Job 0 cancelled ")
+ checkError(
+ exception = failureReason.get.asInstanceOf[SparkException],
+ errorClass = "SPARK_JOB_CANCELLED",
+ sqlState = "XXKDA",
+ parameters = scala.collection.immutable.Map("jobId" -> "0", "reason" ->
"")
+ )
}
test("run trivial job") {
@@ -841,7 +846,12 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
val rdd = new MyRDD(sc, 1, Nil)
val jobId = submit(rdd, Array(0))
cancel(jobId)
- assert(failure.getMessage === s"Job $jobId cancelled ")
+ checkError(
+ exception = failure.asInstanceOf[SparkException],
+ errorClass = "SPARK_JOB_CANCELLED",
+ sqlState = "XXKDA",
+ parameters = scala.collection.immutable.Map("jobId" -> jobId.toString,
"reason" -> "")
+ )
assert(sparkListener.failedStages === Seq(0))
assertDataStructuresEmpty()
}
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index cba6a24b869..3e706debbf8 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1850,6 +1850,12 @@ The seed expression `<seedExpr>` of the expression
`<exprWithSeed>` must be fold
sortBy must be used together with bucketBy.
+### SPARK_JOB_CANCELLED
+
+[SQLSTATE: XXKDA](sql-error-conditions-sqlstates.html#class-XX-internal-error)
+
+Job `<jobId>` cancelled `<reason>`
+
### SPECIFY_BUCKETING_IS_NOT_ALLOWED
[SQLSTATE:
42601](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]