This is an automated email from the ASF dual-hosted git repository.
csy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 73654709 [AURON #1625] Fix disable convert BroadcastExchange to native
does not take effect (#1627)
73654709 is described below
commit 736547092fbe8899b31118559f5437837c7c0565
Author: Thomas <[email protected]>
AuthorDate: Mon Dec 1 16:08:41 2025 +0800
[AURON #1625] Fix disable convert BroadcastExchange to native does not take
effect (#1627)
---
.../AuronCheckConvertBroadcastExchangeSuite.scala | 189 ++++++++-------------
1 file changed, 72 insertions(+), 117 deletions(-)
diff --git
a/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronCheckConvertBroadcastExchangeSuite.scala
b/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronCheckConvertBroadcastExchangeSuite.scala
index 289bd45c..b69567ff 100644
---
a/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronCheckConvertBroadcastExchangeSuite.scala
+++
b/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronCheckConvertBroadcastExchangeSuite.scala
@@ -16,145 +16,100 @@
*/
package org.apache.auron
-import org.apache.spark.sql.{AuronQueryTest, Row, SparkSession}
-import org.apache.spark.sql.auron.AuronConverters
+import org.apache.spark.sql.{AuronQueryTest, Row}
import org.apache.spark.sql.execution.auron.plan.NativeBroadcastExchangeExec
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
-import org.apache.spark.sql.test.SharedSparkSession
-class AuronCheckConvertBroadcastExchangeSuite
- extends AuronQueryTest
- with SharedSparkSession
- with AuronSQLTestHelper {
+class AuronCheckConvertBroadcastExchangeSuite extends AuronQueryTest with
BaseAuronSQLSuite {
import testImplicits._
test(
- "test bhj broadcastExchange to native where
spark.auron.enable.broadcastexchange is true") {
- val spark = SparkSession
- .builder()
- .master("local[2]")
- .appName("checkConvertToBroadcast")
- .config("spark.sql.shuffle.partitions", "4")
- .config("spark.sql.autoBroadcastJoinThreshold", -1)
- .config("spark.sql.adaptive.enabled", "true")
- .config("spark.sql.extensions",
"org.apache.spark.sql.auron.AuronSparkSessionExtension")
- .config(
- "spark.shuffle.manager",
- "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager")
- .config("spark.memory.offHeap.enabled", "false")
- .config("spark.auron.enable", "true")
- .getOrCreate()
+ "test bhj broadcastExchange to native where
spark.auron.enable.broadcastExchange is true") {
+ withSQLConf("spark.auron.enable.broadcastExchange" -> "true") {
+ Seq((1, 2, "test test"))
+ .toDF("c1", "c2", "part")
+ .createOrReplaceTempView("broad_cast_table1")
+ Seq((1, 2, "test test"))
+ .toDF("c1", "c2", "part")
+ .createOrReplaceTempView("broad_cast_table2")
+ val df =
+ spark.sql(
+ "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner
join broad_cast_table2 b on a.c1 = b.c1")
- Seq((1, 2, "test test")).toDF("c1", "c2",
"part").createOrReplaceTempView("broad_cast_table1")
- Seq((1, 2, "test test")).toDF("c1", "c2",
"part").createOrReplaceTempView("broad_cast_table2")
- val executePlan =
- spark.sql(
- "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner
join broad_cast_table2 b on a.c1 = b.c1")
-
- val broadcastExchangeExec =
collect(executePlan.queryExecution.executedPlan) {
- case broadcastExchangeExec: BroadcastExchangeExec =>
broadcastExchangeExec
+ checkAnswer(df, Seq(Row(1, 2)))
+ assert(collectFirst(df.queryExecution.executedPlan) {
+ case broadcastExchangeExec: NativeBroadcastExchangeExec =>
+ broadcastExchangeExec
+ }.isDefined)
}
- assert(broadcastExchangeExec.nonEmpty, "BroadcastExchangeExec not found in
plan")
- val afterConvertPlan =
AuronConverters.convertSparkPlan(broadcastExchangeExec.head)
- assert(afterConvertPlan.isInstanceOf[NativeBroadcastExchangeExec])
- checkAnswer(executePlan, Seq(Row(1, 2)))
}
test(
- "test bnlj broadcastExchange to native where
spark.auron.enable.broadcastexchange is true") {
- val spark = SparkSession
- .builder()
- .master("local[2]")
- .appName("checkConvertToBroadcast")
- .config("spark.sql.shuffle.partitions", "4")
- .config("spark.sql.autoBroadcastJoinThreshold", -1)
- .config("spark.sql.adaptive.enabled", "true")
- .config("spark.sql.extensions",
"org.apache.spark.sql.auron.AuronSparkSessionExtension")
- .config(
- "spark.shuffle.manager",
- "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager")
- .config("spark.memory.offHeap.enabled", "false")
- .config("spark.auron.enable", "true")
- .getOrCreate()
-
- Seq((1, 2, "test test")).toDF("c1", "c2",
"part").createOrReplaceTempView("broad_cast_table1")
- Seq((1, 2, "test test")).toDF("c1", "c2",
"part").createOrReplaceTempView("broad_cast_table2")
- val executePlan =
- spark.sql(
- "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner
join broad_cast_table2 b ")
+ "test bnlj broadcastExchange to native where
spark.auron.enable.broadcastExchange is true") {
+ withSQLConf("spark.auron.enable.broadcastExchange" -> "true") {
+ Seq((1, 2, "test test"))
+ .toDF("c1", "c2", "part")
+ .createOrReplaceTempView("broad_cast_table1")
+ Seq((1, 2, "test test"))
+ .toDF("c1", "c2", "part")
+ .createOrReplaceTempView("broad_cast_table2")
+ val df =
+ spark.sql(
+ "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner
join broad_cast_table2 b ")
- val broadcastExchangeExec =
collect(executePlan.queryExecution.executedPlan) {
- case broadcastExchangeExec: BroadcastExchangeExec =>
broadcastExchangeExec
+ checkAnswer(df, Seq(Row(1, 2)))
+ assert(collectFirst(df.queryExecution.executedPlan) {
+ case broadcastExchangeExec: NativeBroadcastExchangeExec =>
+ broadcastExchangeExec
+ }.isDefined)
}
- assert(broadcastExchangeExec.nonEmpty, "BroadcastExchangeExec not found in
plan")
- val afterConvertPlan =
AuronConverters.convertSparkPlan(broadcastExchangeExec.head)
- assert(afterConvertPlan.isInstanceOf[NativeBroadcastExchangeExec])
- checkAnswer(executePlan, Seq(Row(1, 2)))
}
test(
- "test do not convert broadcastExchange to native when set
spark.auron.enable.broadcastexchange is false") {
- val spark = SparkSession
- .builder()
- .master("local[2]")
- .appName("checkConvertToBroadcast")
- .config("spark.sql.shuffle.partitions", "4")
- .config("spark.sql.autoBroadcastJoinThreshold", -1)
- .config("spark.sql.adaptive.enabled", "true")
- .config("spark.sql.extensions",
"org.apache.spark.sql.auron.AuronSparkSessionExtension")
- .config(
- "spark.shuffle.manager",
- "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager")
- .config("spark.memory.offHeap.enabled", "false")
- .config("spark.auron.enable.broadcastExchange", "false")
- .config("spark.auron.enable", "true")
- .getOrCreate()
+ "test do not convert broadcastExchange to native when set
spark.auron.enable.broadcastExchange is false") {
+ withSQLConf("spark.auron.enable.broadcastExchange" -> "false") {
+ Seq((1, 2, "test test"))
+ .toDF("c1", "c2", "part")
+ .createOrReplaceTempView("broad_cast_table1")
+ Seq((1, 2, "test test"))
+ .toDF("c1", "c2", "part")
+ .createOrReplaceTempView("broad_cast_table2")
+ val df =
+ spark.sql(
+ "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner
join broad_cast_table2 b on a.c1 = b.c1")
- Seq((1, 2, "test test")).toDF("c1", "c2",
"part").createOrReplaceTempView("broad_cast_table1")
- Seq((1, 2, "test test")).toDF("c1", "c2",
"part").createOrReplaceTempView("broad_cast_table2")
- val executePlan =
- spark.sql(
- "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner
join broad_cast_table2 b on a.c1 = b.c1")
-
- val broadcastExchangeExec =
collect(executePlan.queryExecution.executedPlan) {
- case broadcastExchangeExec: BroadcastExchangeExec =>
broadcastExchangeExec
+ checkAnswer(df, Seq(Row(1, 2)))
+ val plan = df.queryExecution.executedPlan
+ assert(collectFirst(plan) { case broadcastExchangeExec:
NativeBroadcastExchangeExec =>
+ broadcastExchangeExec
+ }.isEmpty)
+ assert(collectFirst(plan) { case broadcastExchangeExec:
BroadcastExchangeExec =>
+ broadcastExchangeExec
+ }.isDefined)
}
- assert(broadcastExchangeExec.nonEmpty, "BroadcastExchangeExec not found in
plan")
- val afterConvertPlan =
AuronConverters.convertSparkPlan(broadcastExchangeExec.head)
- assert(afterConvertPlan.isInstanceOf[BroadcastExchangeExec])
- checkAnswer(executePlan, Seq(Row(1, 2)))
}
test(
- "test bnlj broadcastExchange to native where
spark.auron.enable.broadcastexchange is false") {
- val spark = SparkSession
- .builder()
- .master("local[2]")
- .appName("checkConvertToBroadcast")
- .config("spark.sql.shuffle.partitions", "4")
- .config("spark.sql.autoBroadcastJoinThreshold", -1)
- .config("spark.sql.adaptive.enabled", "true")
- .config("spark.sql.extensions",
"org.apache.spark.sql.auron.AuronSparkSessionExtension")
- .config(
- "spark.shuffle.manager",
- "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager")
- .config("spark.memory.offHeap.enabled", "false")
- .config("spark.auron.enable.broadcastExchange", "false")
- .config("spark.auron.enable", "true")
- .getOrCreate()
-
- Seq((1, 2, "test test")).toDF("c1", "c2",
"part").createOrReplaceTempView("broad_cast_table1")
- Seq((1, 2, "test test")).toDF("c1", "c2",
"part").createOrReplaceTempView("broad_cast_table2")
- val executePlan =
- spark.sql(
- "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner
join broad_cast_table2 b ")
+ "test bnlj broadcastExchange to native where
spark.auron.enable.broadcastExchange is false") {
+ withSQLConf("spark.auron.enable.broadcastExchange" -> "false") {
+ Seq((1, 2, "test test"))
+ .toDF("c1", "c2", "part")
+ .createOrReplaceTempView("broad_cast_table1")
+ Seq((1, 2, "test test"))
+ .toDF("c1", "c2", "part")
+ .createOrReplaceTempView("broad_cast_table2")
+ val df =
+ spark.sql(
+ "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner
join broad_cast_table2 b ")
- val broadcastExchangeExec =
collect(executePlan.queryExecution.executedPlan) {
- case broadcastExchangeExec: BroadcastExchangeExec =>
broadcastExchangeExec
+ checkAnswer(df, Seq(Row(1, 2)))
+ val plan = df.queryExecution.executedPlan
+ assert(collectFirst(plan) { case broadcastExchangeExec:
NativeBroadcastExchangeExec =>
+ broadcastExchangeExec
+ }.isEmpty)
+ assert(collectFirst(plan) { case broadcastExchangeExec:
BroadcastExchangeExec =>
+ broadcastExchangeExec
+ }.isDefined)
}
- assert(broadcastExchangeExec.nonEmpty, "BroadcastExchangeExec not found in
plan")
- val afterConvertPlan =
AuronConverters.convertSparkPlan(broadcastExchangeExec.head)
- assert(afterConvertPlan.isInstanceOf[BroadcastExchangeExec])
- checkAnswer(executePlan, Seq(Row(1, 2)))
}
}