This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new dae7a576ec7 [SPARK-43952][SQL][FOLLOWUP] Correct AQE cancel broadcast
job tag
dae7a576ec7 is described below
commit dae7a576ec7a103f2486fa4121731de0b1347a52
Author: ulysses-you <[email protected]>
AuthorDate: Fri Jul 21 10:41:24 2023 +0800
[SPARK-43952][SQL][FOLLOWUP] Correct AQE cancel broadcast job tag
### What changes were proposed in this pull request?
This pr changes `cancelJobGroup` to `cancelJobsWithTag ` in AQE, so that
broadcast exchange can be cancelled correctly.
Since we do not set job id when executing broadcast job and use job tag to
cancel it, this pr adds `jobTag` to `BroadcastExchangeLike`.
### Why are the changes needed?
fix regression
### Does this PR introduce _any_ user-facing change?
no, not released yet
### How was this patch tested?
test manully
```sql
select * from t1
join (select c1, java_method('java.lang.Thread', 'sleep', 5000l) from t2)t2
on t1.c1 = t2.c1
join (select c1, raise_error('force_fail') from t3)t3 on t1.c1 = t3.c1
```
before:
<img width="1194" alt="image"
src="https://github.com/apache/spark/assets/12025282/55d218da-7289-404a-b201-1ea9f4902026">
after:
<img width="1202" alt="image"
src="https://github.com/apache/spark/assets/12025282/9b293d1f-01d6-43e2-9c1a-20540f58c3e5">
Closes #41979 from ulysses-you/jobtag-followup.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Xiduo You <[email protected]>
(cherry picked from commit 99f9df564ef3f3223b4789d111426d5be5854c4a)
Signed-off-by: Xiduo You <[email protected]>
---
.../spark/sql/execution/adaptive/QueryStageExec.scala | 2 +-
.../sql/execution/exchange/BroadcastExchangeExec.scala | 14 +++++++-------
.../org/apache/spark/sql/SparkSessionExtensionSuite.scala | 2 +-
3 files changed, 9 insertions(+), 9 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index d48b4fe1751..c6234a40726 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -250,7 +250,7 @@ case class BroadcastQueryStageExec(
override def cancel(): Unit = {
if (!broadcast.relationFuture.isDone) {
- sparkContext.cancelJobGroup(broadcast.runId.toString)
+ sparkContext.cancelJobsWithTag(broadcast.jobTag)
broadcast.relationFuture.cancel(true)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 15141b09b6c..866a62a3a07 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -45,9 +45,14 @@ import org.apache.spark.util.{SparkFatalException,
ThreadUtils}
trait BroadcastExchangeLike extends Exchange {
/**
- * The broadcast job group ID
+ * The broadcast run ID in job tag
*/
- def runId: UUID = UUID.randomUUID
+ val runId: UUID = UUID.randomUUID
+
+ /**
+ * The broadcast job tag
+ */
+ def jobTag: String = s"broadcast exchange (runId ${runId.toString})"
/**
* The asynchronous job that prepares the broadcast relation.
@@ -80,8 +85,6 @@ case class BroadcastExchangeExec(
child: SparkPlan) extends BroadcastExchangeLike {
import BroadcastExchangeExec._
- override val runId: UUID = UUID.randomUUID
-
override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
@@ -124,9 +127,6 @@ case class BroadcastExchangeExec(
case _ => 512000000
}
- @transient
- private lazy val jobTag = s"broadcast exchange (runId ${runId.toString})"
-
@transient
override lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]](
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 87237f467fc..043a3b1a7e5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -1003,7 +1003,7 @@ case class MyShuffleExchangeExec(delegate:
ShuffleExchangeExec) extends ShuffleE
* whether AQE is enabled.
*/
case class MyBroadcastExchangeExec(delegate: BroadcastExchangeExec) extends
BroadcastExchangeLike {
- override def runId: UUID = delegate.runId
+ override val runId: UUID = delegate.runId
override def relationFuture: java.util.concurrent.Future[Broadcast[Any]] =
delegate.relationFuture
override def completionFuture: Future[Broadcast[Any]] =
delegate.submitBroadcastJob
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]