This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8daa2553c92 [SPARK-44195][R] Add JobTag APIs to SparkR SparkContext
8daa2553c92 is described below

commit 8daa2553c92bf7de4a214c7bb4127141e06cc1d2
Author: Juliusz Sompolski <[email protected]>
AuthorDate: Mon Jul 3 09:53:26 2023 +0900

    [SPARK-44195][R] Add JobTag APIs to SparkR SparkContext
    
    ### What changes were proposed in this pull request?
    
    Add APIs from https://github.com/apache/spark/pull/41440 to SparkR:
    * addJobTag(tag)
    * removeJobTag(tag)
    * getJobTags()
    * clearJobTags()
    * cancelJobsWithTag()
    * setInterruptOnCancel(tag)
    
    Additionally:
    * fix a bug in removeJobTag when the last tag is removed (should be left 
with empty tags, not an empty string tag)
    * fix comments to cancelJobsWithTag
    * add a few defensive reinforcements against an empty string tag as a 
result of missing property / removing last tag.
    
    ### Why are the changes needed?
    
    SparkR parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, introduce the APIs introduced in Scala in 
https://github.com/apache/spark/pull/41440 to SparkR
    
    ### How was this patch tested?
    
    Added test.
    
    Closes #41742 from juliuszsompolski/SPARK-44195.
    
    Authored-by: Juliusz Sompolski <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 R/pkg/NAMESPACE                                    | 10 ++-
 R/pkg/R/sparkR.R                                   | 98 ++++++++++++++++++++++
 R/pkg/pkgdown/_pkgdown_template.yml                |  6 ++
 R/pkg/tests/fulltests/test_context.R               | 16 ++++
 .../main/scala/org/apache/spark/SparkContext.scala |  9 +-
 .../apache/spark/api/java/JavaSparkContext.scala   |  2 +-
 .../main/scala/org/apache/spark/api/r/RUtils.scala |  4 +
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  6 +-
 .../apache/spark/status/AppStatusListener.scala    |  1 +
 .../org/apache/spark/JobCancellationSuite.scala    |  4 +-
 10 files changed, 147 insertions(+), 9 deletions(-)

diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index bb05e99a9d8..78068f20c57 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -77,13 +77,19 @@ exportMethods("glm",
               "spark.lm",
               "spark.fmRegressor")
 
-# Job group lifecycle management methods
+# Job group and job tag lifecycle management methods
 export("setJobGroup",
        "clearJobGroup",
        "cancelJobGroup",
        "setJobDescription",
+       "setInterruptOnCancel",
        "setLocalProperty",
-       "getLocalProperty")
+       "getLocalProperty",
+       "addJobTag",
+       "removeJobTag",
+       "getJobTags",
+       "clearJobTags",
+       "cancelJobsWithTag")
 
 # Export Utility methods
 export("setLogLevel")
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index e2ab5747177..1cbaad6d178 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -542,6 +542,104 @@ cancelJobGroup <- function(groupId) {
   invisible(callJMethod(sc, "cancelJobGroup", groupId))
 }
 
+#' Set the behavior of job cancellation from jobs started in this thread.
+#'
+#' @param interruptOnCancel If true, then job cancellation will result in 
`Thread.interrupt()`
+#' being called on the job's executor threads. This is useful to help ensure 
that the tasks
+#' are actually stopped in a timely manner, but is off by default due to 
HDFS-1208, where HDFS
+#' may respond to Thread.interrupt() by marking nodes as dead.
+#' @rdname setInterruptOnCancel
+#' @name setInterruptOnCancel
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' setInterruptOnCancel(true)
+#'}
+#' @note cancelJobGroup since 3.5.0
+setInterruptOnCancel <- function(interruptOnCancel) {
+  sc <- getSparkContext()
+  invisible(callJMethod(sc, "setInterruptOnCancel", interruptOnCancel))
+}
+
+#' Add a tag to be assigned to all the jobs started by this thread.
+#'
+#' @param tag The tag to be added. Cannot contain ',' (comma) character.
+#' @rdname addJobTAg
+#' @name addJobTag
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' addJobTag("myJobTag")
+#'}
+#' @note addJobTag since 3.5.0
+addJobTag <- function(tag) {
+  sc <- getSparkContext()
+  invisible(callJMethod(sc, "addJobTag", tag))
+}
+
+#' Remove a tag previously added to be assigned to all the jobs started by 
this thread.
+#' Noop if such a tag was not added earlier.
+#'
+#' @param tag The tag to be removed. Cannot contain ',' (comma) character.
+#' @rdname removeJobTAg
+#' @name removeJobTag
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' removeJobTag("myJobTag")
+#'}
+#' @note cancelJobGroup since 3.5.0
+removeJobTag <- function(tag) {
+  sc <- getSparkContext()
+  invisible(callJMethod(sc, "removeJobTag", tag))
+}
+
+#' Get the tags that are currently set to be assigned to all the jobs started 
by this thread.
+#'
+#' @rdname getJobTags
+#' @name getJobTags
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' tags <- getJobTags()
+#'}
+#' @note getJobTags since 3.5.0
+getJobTags <- function() {
+  sc <- getSparkContext()
+  callJStatic("org.apache.spark.api.r.RUtils", "getJobTags", sc)
+}
+
+#' Clear the current thread's job tags.
+#'
+#' @rdname clearJobTags
+#' @name clearJobTags
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' clearJobTags()
+#'}
+#' @note clearJobTags since 3.5.0
+clearJobTags <- function() {
+  sc <- getSparkContext()
+  invisible(callJMethod(sc, "clearJobTags"))
+}
+
+#' Cancel active jobs that have the specified tag.
+#'
+#' @param tag The tag to be cancelled. Cannot contain ',' (comma) character.
+#' @rdname cancelJobsWithTag
+#' @name cancelJobsWithTag
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' cancelJobsWithTag("myTag")
+#'}
+#' @note cancelJobGroup since 3.5.0
+cancelJobsWithTag <- function(tag) {
+  sc <- getSparkContext()
+  invisible(callJMethod(sc, "cancelJobsWithTag", tag))
+}
+
 #' Set a human readable description of the current job.
 #'
 #' Set a description that is shown as a job description in UI.
diff --git a/R/pkg/pkgdown/_pkgdown_template.yml 
b/R/pkg/pkgdown/_pkgdown_template.yml
index e6b485d4898..9ee49a527a9 100644
--- a/R/pkg/pkgdown/_pkgdown_template.yml
+++ b/R/pkg/pkgdown/_pkgdown_template.yml
@@ -289,13 +289,19 @@ reference:
 - title: "Spark Session and Context"
 - contents:
   - cancelJobGroup
+  - cancelJobsWithTag
   - clearCache
   - clearJobGroup
   - getLocalProperty
   - install.spark
   - setCheckpointDir
   - setJobDescription
+  - setInterruptOnCancel
   - setJobGroup
+  - addJobTag
+  - removeJobTag
+  - getJobTags
+  - clearJobTags
   - setLocalProperty
   - setLogLevel
   - spark.addFile
diff --git a/R/pkg/tests/fulltests/test_context.R 
b/R/pkg/tests/fulltests/test_context.R
index 1add5a9fdde..5236bac7a54 100644
--- a/R/pkg/tests/fulltests/test_context.R
+++ b/R/pkg/tests/fulltests/test_context.R
@@ -95,6 +95,22 @@ test_that("job group functions can be called", {
   setJobGroup("groupId", "job description", TRUE)
   cancelJobGroup("groupId")
   clearJobGroup()
+  setInterruptOnCancel(TRUE)
+
+  sparkR.session.stop()
+  expect_true(TRUE)
+})
+
+test_that("job tag functions can be called", {
+  sc <- sparkR.sparkContext(master = sparkRTestMaster)
+  addJobTag("B")
+  clearJobTags()
+  expect_true(identical(getJobTags(), list()))
+  addJobTag("A")
+  expect_true(identical(getJobTags(), list("A")))
+  removeJobTag("A")
+  expect_true(identical(getJobTags(), list()))
+  cancelJobsWithTag("A")
 
   sparkR.session.stop()
   expect_true(TRUE)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 78c7ecb2782..58f8310da70 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -878,7 +878,11 @@ class SparkContext(config: SparkConf) extends Logging {
     SparkContext.throwIfInvalidTag(tag)
     val existingTags = getJobTags()
     val newTags = (existingTags - 
tag).mkString(SparkContext.SPARK_JOB_TAGS_SEP)
-    setLocalProperty(SparkContext.SPARK_JOB_TAGS, newTags)
+    if (newTags.isEmpty) {
+      clearJobTags()
+    } else {
+      setLocalProperty(SparkContext.SPARK_JOB_TAGS, newTags)
+    }
   }
 
   /**
@@ -890,6 +894,7 @@ class SparkContext(config: SparkConf) extends Logging {
     Option(getLocalProperty(SparkContext.SPARK_JOB_TAGS))
       .map(_.split(SparkContext.SPARK_JOB_TAGS_SEP).toSet)
       .getOrElse(Set())
+      .filter(!_.isEmpty) // empty string tag should not happen, but be 
defensive
   }
 
   /**
@@ -2544,7 +2549,7 @@ class SparkContext(config: SparkConf) extends Logging {
   /**
    * Cancel active jobs that have the specified tag. See 
`org.apache.spark.SparkContext.addJobTag`.
    *
-   * @param tag The tag to be added. Cannot contain ',' (comma) character.
+   * @param tag The tag to be cancelled. Cannot contain ',' (comma) character.
    *
    * @since 3.5.0
    */
diff --git 
a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 31703cdad5b..8c99e5622ba 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -767,7 +767,7 @@ class JavaSparkContext(val sc: SparkContext) extends 
Closeable {
   /**
    * Cancel active jobs that have the specified tag. See 
`org.apache.spark.SparkContext.addJobTag`.
    *
-   * @param tag The tag to be added. Cannot contain ',' (comma) character.
+   * @param tag The tag to be cancelled. Cannot contain ',' (comma) character.
    *
    * @since 3.5.0
    */
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala 
b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
index 784a57e7b98..a4f6b41bcf5 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -110,4 +110,8 @@ private[spark] object RUtils {
   def isEncryptionEnabled(sc: JavaSparkContext): Boolean = {
     sc.conf.get(org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED)
   }
+
+  def getJobTags(sc: JavaSparkContext): Array[String] = {
+    sc.getJobTags().toArray().map(_.asInstanceOf[String])
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 00f505fa5a9..fc83439454d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1199,7 +1199,7 @@ private[spark] class DAGScheduler(
     val jobIds = activeJobs.filter { activeJob =>
       Option(activeJob.properties).exists { properties =>
         
Option(properties.getProperty(SparkContext.SPARK_JOB_TAGS)).getOrElse("")
-          .split(SparkContext.SPARK_JOB_TAGS_SEP).toSet.contains(tag)
+          
.split(SparkContext.SPARK_JOB_TAGS_SEP).filter(!_.isEmpty).toSet.contains(tag)
       }
     }.map(_.jobId)
     jobIds.foreach(handleJobCancellation(_,
@@ -3003,8 +3003,8 @@ private[scheduler] class 
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
     case JobGroupCancelled(groupId) =>
       dagScheduler.handleJobGroupCancelled(groupId)
 
-    case JobTagCancelled(groupId) =>
-      dagScheduler.handleJobTagCancelled(groupId)
+    case JobTagCancelled(tag) =>
+      dagScheduler.handleJobTagCancelled(tag)
 
     case AllJobsCancelled =>
       dagScheduler.doCancelAllJobs()
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index c1f52e86dd0..15815e5539f 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -443,6 +443,7 @@ private[spark] class AppStatusListener(
       .map(_.split(SparkContext.SPARK_JOB_TAGS_SEP).toSet)
       .getOrElse(Set())
       .toSeq
+      .filter(!_.isEmpty)
       .sorted
     val sqlExecutionId = Option(event.properties)
       .flatMap(p => Option(p.getProperty(SQL_EXECUTION_ID_KEY)).map(_.toLong))
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala 
b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index f2ad33b0be7..fb2c44bca7e 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -215,7 +215,9 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
         }
       }(executionContext)
       val jobC = Future {
-        assert(sc.getJobTags() == Set())
+        sc.addJobTag("foo")
+        sc.removeJobTag("foo")
+        assert(sc.getJobTags() == Set()) // check that remove works removing 
the last tag
         sc.addJobTag("two")
         assert(sc.getJobTags() == Set("two"))
         try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to