This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8daa2553c92 [SPARK-44195][R] Add JobTag APIs to SparkR SparkContext
8daa2553c92 is described below
commit 8daa2553c92bf7de4a214c7bb4127141e06cc1d2
Author: Juliusz Sompolski <[email protected]>
AuthorDate: Mon Jul 3 09:53:26 2023 +0900
[SPARK-44195][R] Add JobTag APIs to SparkR SparkContext
### What changes were proposed in this pull request?
Add APIs from https://github.com/apache/spark/pull/41440 to SparkR:
* addJobTag(tag)
* removeJobTag(tag)
* getJobTags()
* clearJobTags()
* cancelJobsWithTag()
* setInterruptOnCancel(tag)
Additionally:
* fix a bug in removeJobTag when the last tag is removed (should be left
with empty tags, not an empty string tag)
* fix comments to cancelJobsWithTag
* add a few defensive reinforcements against an empty string tag as a
result of missing property / removing last tag.
### Why are the changes needed?
SparkR parity.
### Does this PR introduce _any_ user-facing change?
Yes, introduce the APIs introduced in Scala in
https://github.com/apache/spark/pull/41440 to SparkR
### How was this patch tested?
Added test.
Closes #41742 from juliuszsompolski/SPARK-44195.
Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
R/pkg/NAMESPACE | 10 ++-
R/pkg/R/sparkR.R | 98 ++++++++++++++++++++++
R/pkg/pkgdown/_pkgdown_template.yml | 6 ++
R/pkg/tests/fulltests/test_context.R | 16 ++++
.../main/scala/org/apache/spark/SparkContext.scala | 9 +-
.../apache/spark/api/java/JavaSparkContext.scala | 2 +-
.../main/scala/org/apache/spark/api/r/RUtils.scala | 4 +
.../org/apache/spark/scheduler/DAGScheduler.scala | 6 +-
.../apache/spark/status/AppStatusListener.scala | 1 +
.../org/apache/spark/JobCancellationSuite.scala | 4 +-
10 files changed, 147 insertions(+), 9 deletions(-)
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index bb05e99a9d8..78068f20c57 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -77,13 +77,19 @@ exportMethods("glm",
"spark.lm",
"spark.fmRegressor")
-# Job group lifecycle management methods
+# Job group and job tag lifecycle management methods
export("setJobGroup",
"clearJobGroup",
"cancelJobGroup",
"setJobDescription",
+ "setInterruptOnCancel",
"setLocalProperty",
- "getLocalProperty")
+ "getLocalProperty",
+ "addJobTag",
+ "removeJobTag",
+ "getJobTags",
+ "clearJobTags",
+ "cancelJobsWithTag")
# Export Utility methods
export("setLogLevel")
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index e2ab5747177..1cbaad6d178 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -542,6 +542,104 @@ cancelJobGroup <- function(groupId) {
invisible(callJMethod(sc, "cancelJobGroup", groupId))
}
+#' Set the behavior of job cancellation from jobs started in this thread.
+#'
+#' @param interruptOnCancel If true, then job cancellation will result in
`Thread.interrupt()`
+#' being called on the job's executor threads. This is useful to help ensure
that the tasks
+#' are actually stopped in a timely manner, but is off by default due to
HDFS-1208, where HDFS
+#' may respond to Thread.interrupt() by marking nodes as dead.
+#' @rdname setInterruptOnCancel
+#' @name setInterruptOnCancel
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' setInterruptOnCancel(true)
+#'}
+#' @note cancelJobGroup since 3.5.0
+setInterruptOnCancel <- function(interruptOnCancel) {
+ sc <- getSparkContext()
+ invisible(callJMethod(sc, "setInterruptOnCancel", interruptOnCancel))
+}
+
+#' Add a tag to be assigned to all the jobs started by this thread.
+#'
+#' @param tag The tag to be added. Cannot contain ',' (comma) character.
+#' @rdname addJobTAg
+#' @name addJobTag
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' addJobTag("myJobTag")
+#'}
+#' @note addJobTag since 3.5.0
+addJobTag <- function(tag) {
+ sc <- getSparkContext()
+ invisible(callJMethod(sc, "addJobTag", tag))
+}
+
+#' Remove a tag previously added to be assigned to all the jobs started by
this thread.
+#' Noop if such a tag was not added earlier.
+#'
+#' @param tag The tag to be removed. Cannot contain ',' (comma) character.
+#' @rdname removeJobTAg
+#' @name removeJobTag
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' removeJobTag("myJobTag")
+#'}
+#' @note cancelJobGroup since 3.5.0
+removeJobTag <- function(tag) {
+ sc <- getSparkContext()
+ invisible(callJMethod(sc, "removeJobTag", tag))
+}
+
+#' Get the tags that are currently set to be assigned to all the jobs started
by this thread.
+#'
+#' @rdname getJobTags
+#' @name getJobTags
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' tags <- getJobTags()
+#'}
+#' @note getJobTags since 3.5.0
+getJobTags <- function() {
+ sc <- getSparkContext()
+ callJStatic("org.apache.spark.api.r.RUtils", "getJobTags", sc)
+}
+
+#' Clear the current thread's job tags.
+#'
+#' @rdname clearJobTags
+#' @name clearJobTags
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' clearJobTags()
+#'}
+#' @note clearJobTags since 3.5.0
+clearJobTags <- function() {
+ sc <- getSparkContext()
+ invisible(callJMethod(sc, "clearJobTags"))
+}
+
+#' Cancel active jobs that have the specified tag.
+#'
+#' @param tag The tag to be cancelled. Cannot contain ',' (comma) character.
+#' @rdname cancelJobsWithTag
+#' @name cancelJobsWithTag
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' cancelJobsWithTag("myTag")
+#'}
+#' @note cancelJobGroup since 3.5.0
+cancelJobsWithTag <- function(tag) {
+ sc <- getSparkContext()
+ invisible(callJMethod(sc, "cancelJobsWithTag", tag))
+}
+
#' Set a human readable description of the current job.
#'
#' Set a description that is shown as a job description in UI.
diff --git a/R/pkg/pkgdown/_pkgdown_template.yml
b/R/pkg/pkgdown/_pkgdown_template.yml
index e6b485d4898..9ee49a527a9 100644
--- a/R/pkg/pkgdown/_pkgdown_template.yml
+++ b/R/pkg/pkgdown/_pkgdown_template.yml
@@ -289,13 +289,19 @@ reference:
- title: "Spark Session and Context"
- contents:
- cancelJobGroup
+ - cancelJobsWithTag
- clearCache
- clearJobGroup
- getLocalProperty
- install.spark
- setCheckpointDir
- setJobDescription
+ - setInterruptOnCancel
- setJobGroup
+ - addJobTag
+ - removeJobTag
+ - getJobTags
+ - clearJobTags
- setLocalProperty
- setLogLevel
- spark.addFile
diff --git a/R/pkg/tests/fulltests/test_context.R
b/R/pkg/tests/fulltests/test_context.R
index 1add5a9fdde..5236bac7a54 100644
--- a/R/pkg/tests/fulltests/test_context.R
+++ b/R/pkg/tests/fulltests/test_context.R
@@ -95,6 +95,22 @@ test_that("job group functions can be called", {
setJobGroup("groupId", "job description", TRUE)
cancelJobGroup("groupId")
clearJobGroup()
+ setInterruptOnCancel(TRUE)
+
+ sparkR.session.stop()
+ expect_true(TRUE)
+})
+
+test_that("job tag functions can be called", {
+ sc <- sparkR.sparkContext(master = sparkRTestMaster)
+ addJobTag("B")
+ clearJobTags()
+ expect_true(identical(getJobTags(), list()))
+ addJobTag("A")
+ expect_true(identical(getJobTags(), list("A")))
+ removeJobTag("A")
+ expect_true(identical(getJobTags(), list()))
+ cancelJobsWithTag("A")
sparkR.session.stop()
expect_true(TRUE)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 78c7ecb2782..58f8310da70 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -878,7 +878,11 @@ class SparkContext(config: SparkConf) extends Logging {
SparkContext.throwIfInvalidTag(tag)
val existingTags = getJobTags()
val newTags = (existingTags -
tag).mkString(SparkContext.SPARK_JOB_TAGS_SEP)
- setLocalProperty(SparkContext.SPARK_JOB_TAGS, newTags)
+ if (newTags.isEmpty) {
+ clearJobTags()
+ } else {
+ setLocalProperty(SparkContext.SPARK_JOB_TAGS, newTags)
+ }
}
/**
@@ -890,6 +894,7 @@ class SparkContext(config: SparkConf) extends Logging {
Option(getLocalProperty(SparkContext.SPARK_JOB_TAGS))
.map(_.split(SparkContext.SPARK_JOB_TAGS_SEP).toSet)
.getOrElse(Set())
+ .filter(!_.isEmpty) // empty string tag should not happen, but be
defensive
}
/**
@@ -2544,7 +2549,7 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Cancel active jobs that have the specified tag. See
`org.apache.spark.SparkContext.addJobTag`.
*
- * @param tag The tag to be added. Cannot contain ',' (comma) character.
+ * @param tag The tag to be cancelled. Cannot contain ',' (comma) character.
*
* @since 3.5.0
*/
diff --git
a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 31703cdad5b..8c99e5622ba 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -767,7 +767,7 @@ class JavaSparkContext(val sc: SparkContext) extends
Closeable {
/**
* Cancel active jobs that have the specified tag. See
`org.apache.spark.SparkContext.addJobTag`.
*
- * @param tag The tag to be added. Cannot contain ',' (comma) character.
+ * @param tag The tag to be cancelled. Cannot contain ',' (comma) character.
*
* @since 3.5.0
*/
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
index 784a57e7b98..a4f6b41bcf5 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -110,4 +110,8 @@ private[spark] object RUtils {
def isEncryptionEnabled(sc: JavaSparkContext): Boolean = {
sc.conf.get(org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED)
}
+
+ def getJobTags(sc: JavaSparkContext): Array[String] = {
+ sc.getJobTags().toArray().map(_.asInstanceOf[String])
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 00f505fa5a9..fc83439454d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1199,7 +1199,7 @@ private[spark] class DAGScheduler(
val jobIds = activeJobs.filter { activeJob =>
Option(activeJob.properties).exists { properties =>
Option(properties.getProperty(SparkContext.SPARK_JOB_TAGS)).getOrElse("")
- .split(SparkContext.SPARK_JOB_TAGS_SEP).toSet.contains(tag)
+
.split(SparkContext.SPARK_JOB_TAGS_SEP).filter(!_.isEmpty).toSet.contains(tag)
}
}.map(_.jobId)
jobIds.foreach(handleJobCancellation(_,
@@ -3003,8 +3003,8 @@ private[scheduler] class
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
- case JobTagCancelled(groupId) =>
- dagScheduler.handleJobTagCancelled(groupId)
+ case JobTagCancelled(tag) =>
+ dagScheduler.handleJobTagCancelled(tag)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
diff --git
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index c1f52e86dd0..15815e5539f 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -443,6 +443,7 @@ private[spark] class AppStatusListener(
.map(_.split(SparkContext.SPARK_JOB_TAGS_SEP).toSet)
.getOrElse(Set())
.toSeq
+ .filter(!_.isEmpty)
.sorted
val sqlExecutionId = Option(event.properties)
.flatMap(p => Option(p.getProperty(SQL_EXECUTION_ID_KEY)).map(_.toLong))
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index f2ad33b0be7..fb2c44bca7e 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -215,7 +215,9 @@ class JobCancellationSuite extends SparkFunSuite with
Matchers with BeforeAndAft
}
}(executionContext)
val jobC = Future {
- assert(sc.getJobTags() == Set())
+ sc.addJobTag("foo")
+ sc.removeJobTag("foo")
+ assert(sc.getJobTags() == Set()) // check that remove works removing
the last tag
sc.addJobTag("two")
assert(sc.getJobTags() == Set("two"))
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]