This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ba69d8  fix: Comet native operator after ReusedExchange (#187)
7ba69d8 is described below

commit 7ba69d84fe8bbb50dc1050a94d860a48202bbf44
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon Mar 11 09:47:30 2024 -0700

    fix: Comet native operator after ReusedExchange (#187)
---
 .../org/apache/spark/sql/comet/operators.scala     |  3 ++-
 .../comet/exec/CometColumnarShuffleSuite.scala     | 28 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index e75f9a4..5551ffd 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -35,6 +35,7 @@ import 
org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.comet.execution.shuffle.{ArrowReaderIterator, 
CometShuffleExchangeExec}
 import org.apache.spark.sql.execution.{ColumnarToRowExec, 
ExecSubqueryExpression, ExplainUtils, LeafExecNode, ScalarSubquery, SparkPlan, 
UnaryExecNode}
 import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, 
ShuffleQueryStageExec}
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -266,7 +267,7 @@ abstract class CometNativeExec extends CometExec {
     plan match {
       case _: CometScanExec | _: CometBatchScanExec | _: ShuffleQueryStageExec 
|
           _: AQEShuffleReadExec | _: CometShuffleExchangeExec | _: 
CometUnionExec |
-          _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec =>
+          _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | _: 
ReusedExchangeExec =>
         func(plan)
       case _: CometPlan =>
         // Other Comet operators, continue to traverse the tree.
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
index a9b29e6..1a92f71 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
@@ -27,6 +27,8 @@ import org.apache.spark.{Partitioner, SparkConf}
 import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
 import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleDependency, 
CometShuffleExchangeExec, CometShuffleManager}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -937,6 +939,32 @@ class DisableAQECometShuffleSuite extends 
CometColumnarShuffleSuite {
   override protected val asyncShuffleEnable: Boolean = false
 
   protected val adaptiveExecutionEnabled: Boolean = false
+
+  import testImplicits._
+
+  test("Comet native operator after ReusedExchange") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      CometConf.COMET_EXEC_ENABLED.key -> "true",
+      CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
+      CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+      withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") {
+        withParquetTable((0 until 10).map(i => (i % 10, i + 2)), "tbl_b") {
+          val df = sql("SELECT * FROM tbl_a")
+          val left = df
+            .select($"_1" + 1 as ("a"))
+            .filter($"a" > 4)
+          val right = left.select($"a" as ("b"))
+          val join = left.join(right, $"a" === $"b")
+          checkSparkAnswerAndOperator(
+            join,
+            classOf[ReusedExchangeExec],
+            classOf[SortMergeJoinExec])
+        }
+      }
+    }
+  }
 }
 
 class DisableAQECometAsyncShuffleSuite extends CometColumnarShuffleSuite {

Reply via email to