This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new dae7a576ec7 [SPARK-43952][SQL][FOLLOWUP] Correct AQE cancel broadcast 
job tag
dae7a576ec7 is described below

commit dae7a576ec7a103f2486fa4121731de0b1347a52
Author: ulysses-you <[email protected]>
AuthorDate: Fri Jul 21 10:41:24 2023 +0800

    [SPARK-43952][SQL][FOLLOWUP] Correct AQE cancel broadcast job tag
    
    ### What changes were proposed in this pull request?
    
    This pr changes `cancelJobGroup` to `cancelJobsWithTag ` in AQE, so that 
broadcast exchange can be cancelled correctly.
    
    Since we do not set job id when executing broadcast job and use job tag to 
cancel it, this pr adds `jobTag` to `BroadcastExchangeLike`.
    
    ### Why are the changes needed?
    
    fix regression
    
    ### Does this PR introduce _any_ user-facing change?
    
    no, not released yet
    
    ### How was this patch tested?
    
    test manully
    
    ```sql
    select * from t1
    join (select c1, java_method('java.lang.Thread', 'sleep', 5000l) from t2)t2 
on t1.c1 = t2.c1
    join (select c1, raise_error('force_fail') from t3)t3 on t1.c1 = t3.c1
    ```
    
    before:
    <img width="1194" alt="image" 
src="https://github.com/apache/spark/assets/12025282/55d218da-7289-404a-b201-1ea9f4902026";>
    
    after:
    <img width="1202" alt="image" 
src="https://github.com/apache/spark/assets/12025282/9b293d1f-01d6-43e2-9c1a-20540f58c3e5";>
    
    Closes #41979 from ulysses-you/jobtag-followup.
    
    Authored-by: ulysses-you <[email protected]>
    Signed-off-by: Xiduo You <[email protected]>
    (cherry picked from commit 99f9df564ef3f3223b4789d111426d5be5854c4a)
    Signed-off-by: Xiduo You <[email protected]>
---
 .../spark/sql/execution/adaptive/QueryStageExec.scala      |  2 +-
 .../sql/execution/exchange/BroadcastExchangeExec.scala     | 14 +++++++-------
 .../org/apache/spark/sql/SparkSessionExtensionSuite.scala  |  2 +-
 3 files changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index d48b4fe1751..c6234a40726 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -250,7 +250,7 @@ case class BroadcastQueryStageExec(
 
   override def cancel(): Unit = {
     if (!broadcast.relationFuture.isDone) {
-      sparkContext.cancelJobGroup(broadcast.runId.toString)
+      sparkContext.cancelJobsWithTag(broadcast.jobTag)
       broadcast.relationFuture.cancel(true)
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 15141b09b6c..866a62a3a07 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -45,9 +45,14 @@ import org.apache.spark.util.{SparkFatalException, 
ThreadUtils}
 trait BroadcastExchangeLike extends Exchange {
 
   /**
-   * The broadcast job group ID
+   * The broadcast run ID in job tag
    */
-  def runId: UUID = UUID.randomUUID
+  val runId: UUID = UUID.randomUUID
+
+  /**
+   * The broadcast job tag
+   */
+  def jobTag: String = s"broadcast exchange (runId ${runId.toString})"
 
   /**
    * The asynchronous job that prepares the broadcast relation.
@@ -80,8 +85,6 @@ case class BroadcastExchangeExec(
     child: SparkPlan) extends BroadcastExchangeLike {
   import BroadcastExchangeExec._
 
-  override val runId: UUID = UUID.randomUUID
-
   override lazy val metrics = Map(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
@@ -124,9 +127,6 @@ case class BroadcastExchangeExec(
     case _ => 512000000
   }
 
-  @transient
-  private lazy val jobTag = s"broadcast exchange (runId ${runId.toString})"
-
   @transient
   override lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
     SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]](
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 87237f467fc..043a3b1a7e5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -1003,7 +1003,7 @@ case class MyShuffleExchangeExec(delegate: 
ShuffleExchangeExec) extends ShuffleE
  * whether AQE is enabled.
  */
 case class MyBroadcastExchangeExec(delegate: BroadcastExchangeExec) extends 
BroadcastExchangeLike {
-  override def runId: UUID = delegate.runId
+  override val runId: UUID = delegate.runId
   override def relationFuture: java.util.concurrent.Future[Broadcast[Any]] =
     delegate.relationFuture
   override def completionFuture: Future[Broadcast[Any]] = 
delegate.submitBroadcastJob


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to