This is an automated email from the ASF dual-hosted git repository.

csy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 73654709 [AURON #1625] Fix disable convert BroadcastExchange to native 
does not take effect (#1627)
73654709 is described below

commit 736547092fbe8899b31118559f5437837c7c0565
Author: Thomas <[email protected]>
AuthorDate: Mon Dec 1 16:08:41 2025 +0800

    [AURON #1625] Fix disable convert BroadcastExchange to native does not take 
effect (#1627)
---
 .../AuronCheckConvertBroadcastExchangeSuite.scala  | 189 ++++++++-------------
 1 file changed, 72 insertions(+), 117 deletions(-)

diff --git 
a/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronCheckConvertBroadcastExchangeSuite.scala
 
b/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronCheckConvertBroadcastExchangeSuite.scala
index 289bd45c..b69567ff 100644
--- 
a/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronCheckConvertBroadcastExchangeSuite.scala
+++ 
b/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronCheckConvertBroadcastExchangeSuite.scala
@@ -16,145 +16,100 @@
  */
 package org.apache.auron
 
-import org.apache.spark.sql.{AuronQueryTest, Row, SparkSession}
-import org.apache.spark.sql.auron.AuronConverters
+import org.apache.spark.sql.{AuronQueryTest, Row}
 import org.apache.spark.sql.execution.auron.plan.NativeBroadcastExchangeExec
 import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
-import org.apache.spark.sql.test.SharedSparkSession
 
-class AuronCheckConvertBroadcastExchangeSuite
-    extends AuronQueryTest
-    with SharedSparkSession
-    with AuronSQLTestHelper {
+class AuronCheckConvertBroadcastExchangeSuite extends AuronQueryTest with 
BaseAuronSQLSuite {
   import testImplicits._
 
   test(
-    "test bhj broadcastExchange to native where 
spark.auron.enable.broadcastexchange is true") {
-    val spark = SparkSession
-      .builder()
-      .master("local[2]")
-      .appName("checkConvertToBroadcast")
-      .config("spark.sql.shuffle.partitions", "4")
-      .config("spark.sql.autoBroadcastJoinThreshold", -1)
-      .config("spark.sql.adaptive.enabled", "true")
-      .config("spark.sql.extensions", 
"org.apache.spark.sql.auron.AuronSparkSessionExtension")
-      .config(
-        "spark.shuffle.manager",
-        "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager")
-      .config("spark.memory.offHeap.enabled", "false")
-      .config("spark.auron.enable", "true")
-      .getOrCreate()
+    "test bhj broadcastExchange to native where 
spark.auron.enable.broadcastExchange is true") {
+    withSQLConf("spark.auron.enable.broadcastExchange" -> "true") {
+      Seq((1, 2, "test test"))
+        .toDF("c1", "c2", "part")
+        .createOrReplaceTempView("broad_cast_table1")
+      Seq((1, 2, "test test"))
+        .toDF("c1", "c2", "part")
+        .createOrReplaceTempView("broad_cast_table2")
+      val df =
+        spark.sql(
+          "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner 
join broad_cast_table2 b on a.c1 = b.c1")
 
-    Seq((1, 2, "test test")).toDF("c1", "c2", 
"part").createOrReplaceTempView("broad_cast_table1")
-    Seq((1, 2, "test test")).toDF("c1", "c2", 
"part").createOrReplaceTempView("broad_cast_table2")
-    val executePlan =
-      spark.sql(
-        "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner 
join broad_cast_table2 b on a.c1 = b.c1")
-
-    val broadcastExchangeExec = 
collect(executePlan.queryExecution.executedPlan) {
-      case broadcastExchangeExec: BroadcastExchangeExec => 
broadcastExchangeExec
+      checkAnswer(df, Seq(Row(1, 2)))
+      assert(collectFirst(df.queryExecution.executedPlan) {
+        case broadcastExchangeExec: NativeBroadcastExchangeExec =>
+          broadcastExchangeExec
+      }.isDefined)
     }
-    assert(broadcastExchangeExec.nonEmpty, "BroadcastExchangeExec not found in 
plan")
-    val afterConvertPlan = 
AuronConverters.convertSparkPlan(broadcastExchangeExec.head)
-    assert(afterConvertPlan.isInstanceOf[NativeBroadcastExchangeExec])
-    checkAnswer(executePlan, Seq(Row(1, 2)))
   }
 
   test(
-    "test bnlj broadcastExchange to native where 
spark.auron.enable.broadcastexchange is true") {
-    val spark = SparkSession
-      .builder()
-      .master("local[2]")
-      .appName("checkConvertToBroadcast")
-      .config("spark.sql.shuffle.partitions", "4")
-      .config("spark.sql.autoBroadcastJoinThreshold", -1)
-      .config("spark.sql.adaptive.enabled", "true")
-      .config("spark.sql.extensions", 
"org.apache.spark.sql.auron.AuronSparkSessionExtension")
-      .config(
-        "spark.shuffle.manager",
-        "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager")
-      .config("spark.memory.offHeap.enabled", "false")
-      .config("spark.auron.enable", "true")
-      .getOrCreate()
-
-    Seq((1, 2, "test test")).toDF("c1", "c2", 
"part").createOrReplaceTempView("broad_cast_table1")
-    Seq((1, 2, "test test")).toDF("c1", "c2", 
"part").createOrReplaceTempView("broad_cast_table2")
-    val executePlan =
-      spark.sql(
-        "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner 
join broad_cast_table2 b ")
+    "test bnlj broadcastExchange to native where 
spark.auron.enable.broadcastExchange is true") {
+    withSQLConf("spark.auron.enable.broadcastExchange" -> "true") {
+      Seq((1, 2, "test test"))
+        .toDF("c1", "c2", "part")
+        .createOrReplaceTempView("broad_cast_table1")
+      Seq((1, 2, "test test"))
+        .toDF("c1", "c2", "part")
+        .createOrReplaceTempView("broad_cast_table2")
+      val df =
+        spark.sql(
+          "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner 
join broad_cast_table2 b ")
 
-    val broadcastExchangeExec = 
collect(executePlan.queryExecution.executedPlan) {
-      case broadcastExchangeExec: BroadcastExchangeExec => 
broadcastExchangeExec
+      checkAnswer(df, Seq(Row(1, 2)))
+      assert(collectFirst(df.queryExecution.executedPlan) {
+        case broadcastExchangeExec: NativeBroadcastExchangeExec =>
+          broadcastExchangeExec
+      }.isDefined)
     }
-    assert(broadcastExchangeExec.nonEmpty, "BroadcastExchangeExec not found in 
plan")
-    val afterConvertPlan = 
AuronConverters.convertSparkPlan(broadcastExchangeExec.head)
-    assert(afterConvertPlan.isInstanceOf[NativeBroadcastExchangeExec])
-    checkAnswer(executePlan, Seq(Row(1, 2)))
   }
 
   test(
-    "test do not convert broadcastExchange to native when set 
spark.auron.enable.broadcastexchange is false") {
-    val spark = SparkSession
-      .builder()
-      .master("local[2]")
-      .appName("checkConvertToBroadcast")
-      .config("spark.sql.shuffle.partitions", "4")
-      .config("spark.sql.autoBroadcastJoinThreshold", -1)
-      .config("spark.sql.adaptive.enabled", "true")
-      .config("spark.sql.extensions", 
"org.apache.spark.sql.auron.AuronSparkSessionExtension")
-      .config(
-        "spark.shuffle.manager",
-        "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager")
-      .config("spark.memory.offHeap.enabled", "false")
-      .config("spark.auron.enable.broadcastExchange", "false")
-      .config("spark.auron.enable", "true")
-      .getOrCreate()
+    "test do not convert broadcastExchange to native when set 
spark.auron.enable.broadcastExchange is false") {
+    withSQLConf("spark.auron.enable.broadcastExchange" -> "false") {
+      Seq((1, 2, "test test"))
+        .toDF("c1", "c2", "part")
+        .createOrReplaceTempView("broad_cast_table1")
+      Seq((1, 2, "test test"))
+        .toDF("c1", "c2", "part")
+        .createOrReplaceTempView("broad_cast_table2")
+      val df =
+        spark.sql(
+          "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner 
join broad_cast_table2 b on a.c1 = b.c1")
 
-    Seq((1, 2, "test test")).toDF("c1", "c2", 
"part").createOrReplaceTempView("broad_cast_table1")
-    Seq((1, 2, "test test")).toDF("c1", "c2", 
"part").createOrReplaceTempView("broad_cast_table2")
-    val executePlan =
-      spark.sql(
-        "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner 
join broad_cast_table2 b on a.c1 = b.c1")
-
-    val broadcastExchangeExec = 
collect(executePlan.queryExecution.executedPlan) {
-      case broadcastExchangeExec: BroadcastExchangeExec => 
broadcastExchangeExec
+      checkAnswer(df, Seq(Row(1, 2)))
+      val plan = df.queryExecution.executedPlan
+      assert(collectFirst(plan) { case broadcastExchangeExec: 
NativeBroadcastExchangeExec =>
+        broadcastExchangeExec
+      }.isEmpty)
+      assert(collectFirst(plan) { case broadcastExchangeExec: 
BroadcastExchangeExec =>
+        broadcastExchangeExec
+      }.isDefined)
     }
-    assert(broadcastExchangeExec.nonEmpty, "BroadcastExchangeExec not found in 
plan")
-    val afterConvertPlan = 
AuronConverters.convertSparkPlan(broadcastExchangeExec.head)
-    assert(afterConvertPlan.isInstanceOf[BroadcastExchangeExec])
-    checkAnswer(executePlan, Seq(Row(1, 2)))
   }
 
   test(
-    "test bnlj broadcastExchange to native where 
spark.auron.enable.broadcastexchange is false") {
-    val spark = SparkSession
-      .builder()
-      .master("local[2]")
-      .appName("checkConvertToBroadcast")
-      .config("spark.sql.shuffle.partitions", "4")
-      .config("spark.sql.autoBroadcastJoinThreshold", -1)
-      .config("spark.sql.adaptive.enabled", "true")
-      .config("spark.sql.extensions", 
"org.apache.spark.sql.auron.AuronSparkSessionExtension")
-      .config(
-        "spark.shuffle.manager",
-        "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager")
-      .config("spark.memory.offHeap.enabled", "false")
-      .config("spark.auron.enable.broadcastExchange", "false")
-      .config("spark.auron.enable", "true")
-      .getOrCreate()
-
-    Seq((1, 2, "test test")).toDF("c1", "c2", 
"part").createOrReplaceTempView("broad_cast_table1")
-    Seq((1, 2, "test test")).toDF("c1", "c2", 
"part").createOrReplaceTempView("broad_cast_table2")
-    val executePlan =
-      spark.sql(
-        "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner 
join broad_cast_table2 b ")
+    "test bnlj broadcastExchange to native where 
spark.auron.enable.broadcastExchange is false") {
+    withSQLConf("spark.auron.enable.broadcastExchange" -> "false") {
+      Seq((1, 2, "test test"))
+        .toDF("c1", "c2", "part")
+        .createOrReplaceTempView("broad_cast_table1")
+      Seq((1, 2, "test test"))
+        .toDF("c1", "c2", "part")
+        .createOrReplaceTempView("broad_cast_table2")
+      val df =
+        spark.sql(
+          "select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner 
join broad_cast_table2 b ")
 
-    val broadcastExchangeExec = 
collect(executePlan.queryExecution.executedPlan) {
-      case broadcastExchangeExec: BroadcastExchangeExec => 
broadcastExchangeExec
+      checkAnswer(df, Seq(Row(1, 2)))
+      val plan = df.queryExecution.executedPlan
+      assert(collectFirst(plan) { case broadcastExchangeExec: 
NativeBroadcastExchangeExec =>
+        broadcastExchangeExec
+      }.isEmpty)
+      assert(collectFirst(plan) { case broadcastExchangeExec: 
BroadcastExchangeExec =>
+        broadcastExchangeExec
+      }.isDefined)
     }
-    assert(broadcastExchangeExec.nonEmpty, "BroadcastExchangeExec not found in 
plan")
-    val afterConvertPlan = 
AuronConverters.convertSparkPlan(broadcastExchangeExec.head)
-    assert(afterConvertPlan.isInstanceOf[BroadcastExchangeExec])
-    checkAnswer(executePlan, Seq(Row(1, 2)))
   }
 }

Reply via email to