This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c99c4df7f9c [SPARK-48716] Add jobGroupId to 
SparkListenerSQLExecutionStart
4c99c4df7f9c is described below

commit 4c99c4df7f9cda9b35bc7401c92da22ea683861b
Author: Lingkai Kong <lingkai.k...@databricks.com>
AuthorDate: Tue Jul 9 11:29:52 2024 -0700

    [SPARK-48716] Add jobGroupId to SparkListenerSQLExecutionStart
    
    ### What changes were proposed in this pull request?
    Add jobGroupId to SparkListenerSQLExecutionStart
    
    ### Why are the changes needed?
    JobGroupId can be used to combine jobs within the same group. This is going 
to be useful in the listener so it makes the job grouping easy to do
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit Test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #47092 from gjxdxh/gjxdxh/SPARK-48716.
    
    Authored-by: Lingkai Kong <lingkai.k...@databricks.com>
    Signed-off-by: Josh Rosen <joshro...@databricks.com>
---
 .../apache/spark/sql/execution/SQLExecution.scala  |  5 ++--
 .../sql/execution/ui/SQLAppStatusListener.scala    |  2 +-
 .../spark/sql/execution/ui/SQLListener.scala       |  3 +-
 .../spark/sql/execution/SQLExecutionSuite.scala    | 33 ++++++++++++++++++++++
 .../execution/ui/SQLAppStatusListenerSuite.scala   |  2 +-
 5 files changed, 40 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index f4be03c90be7..7c03bad90ebb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicLong
 import scala.jdk.CollectionConverters._
 import scala.util.control.NonFatal
 
-import org.apache.spark.{ErrorMessageFormat, JobArtifactSet, SparkEnv, 
SparkException, SparkThrowable, SparkThrowableHelper}
+import org.apache.spark.{ErrorMessageFormat, JobArtifactSet, SparkContext, 
SparkEnv, SparkException, SparkThrowable, SparkThrowableHelper}
 import org.apache.spark.SparkContext.{SPARK_JOB_DESCRIPTION, 
SPARK_JOB_INTERRUPT_ON_CANCEL}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, 
SPARK_EXECUTOR_PREFIX}
@@ -128,7 +128,8 @@ object SQLExecution extends Logging {
           sparkPlanInfo = SparkPlanInfo.EMPTY,
           time = System.currentTimeMillis(),
           modifiedConfigs = redactedConfigs,
-          jobTags = sc.getJobTags()
+          jobTags = sc.getJobTags(),
+          jobGroupId = 
Option(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
         )
         try {
           body match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index bf33ba2c96f1..dcbf328c71e3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -343,7 +343,7 @@ class SQLAppStatusListener(
 
   private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
     val SparkListenerSQLExecutionStart(executionId, rootExecutionId, 
description, details,
-      physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs, _) = event
+      physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs, _, _) = 
event
 
     val planGraph = SparkPlanGraph(sparkPlanInfo)
     val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 3a22dd23548f..416b9547b046 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -54,7 +54,8 @@ case class SparkListenerSQLExecutionStart(
     sparkPlanInfo: SparkPlanInfo,
     time: Long,
     modifiedConfigs: Map[String, String] = Map.empty,
-    jobTags: Set[String] = Set.empty)
+    jobTags: Set[String] = Set.empty,
+    jobGroupId: Option[String] = None)
   extends SparkListenerEvent
 
 @DeveloperApi
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
index b8a109919f8f..94d33731b6de 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
@@ -227,6 +227,7 @@ class SQLExecutionSuite extends SparkFunSuite with 
SQLConfHelper {
 
       spark.range(1).collect()
 
+      spark.sparkContext.listenerBus.waitUntilEmpty()
       assert(jobTags.contains(jobTag))
       assert(sqlJobTags.contains(jobTag))
     } finally {
@@ -234,6 +235,38 @@ class SQLExecutionSuite extends SparkFunSuite with 
SQLConfHelper {
       spark.stop()
     }
   }
+
+  test("jobGroupId property") {
+    val spark = 
SparkSession.builder().master("local[*]").appName("test").getOrCreate()
+    val JobGroupId = "test-JobGroupId"
+    try {
+      spark.sparkContext.setJobGroup(JobGroupId, "job Group id")
+
+      var jobGroupIdOpt: Option[String] = None
+      var sqlJobGroupIdOpt: Option[String] = None
+      spark.sparkContext.addSparkListener(new SparkListener {
+        override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+          jobGroupIdOpt = 
Some(jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
+        }
+
+        override def onOtherEvent(event: SparkListenerEvent): Unit = {
+          event match {
+            case e: SparkListenerSQLExecutionStart =>
+              sqlJobGroupIdOpt = e.jobGroupId
+          }
+        }
+      })
+
+      spark.range(1).collect()
+
+      spark.sparkContext.listenerBus.waitUntilEmpty()
+      assert(jobGroupIdOpt.contains(JobGroupId))
+      assert(sqlJobGroupIdOpt.contains(JobGroupId))
+    } finally {
+      spark.sparkContext.clearJobGroup()
+      spark.stop()
+    }
+  }
 }
 
 object SQLExecutionSuite {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 17e77cf8d8fb..e63ff019a2b6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -344,7 +344,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
       val listener = new SparkListener {
         override def onOtherEvent(event: SparkListenerEvent): Unit = {
           event match {
-            case SparkListenerSQLExecutionStart(_, _, _, _, planDescription, 
_, _, _, _) =>
+            case SparkListenerSQLExecutionStart(_, _, _, _, planDescription, 
_, _, _, _, _) =>
               assert(expected.forall(planDescription.contains))
               checkDone = true
             case _ => // ignore other events


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to