This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new e52cfb4d fix: ReusedExchangeExec can be child operator of 
CometBroadcastExchangeExec (#713)
e52cfb4d is described below

commit e52cfb4da7e80ef4fea233f06f91815145ff63e8
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Jul 24 12:04:32 2024 -0700

    fix: ReusedExchangeExec can be child operator of CometBroadcastExchangeExec 
(#713)
---
 .../apache/spark/sql/comet/CometBroadcastExchangeExec.scala  |  7 ++++++-
 .../test/scala/org/apache/comet/exec/CometJoinSuite.scala    | 12 ++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
index 64d14a00..0f698d8a 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Statistics
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, 
SQLExecution}
 import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, 
ShuffleQueryStageExec}
-import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, 
ReusedExchangeExec}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -129,6 +129,11 @@ case class CometBroadcastExchangeExec(
           case AQEShuffleReadExec(s: ShuffleQueryStageExec, _)
               if s.plan.isInstanceOf[CometPlan] =>
             CometExec.getByteArrayRdd(s.plan.asInstanceOf[CometPlan]).collect()
+          case ReusedExchangeExec(_, plan) if plan.isInstanceOf[CometPlan] =>
+            CometExec.getByteArrayRdd(plan.asInstanceOf[CometPlan]).collect()
+          case AQEShuffleReadExec(ShuffleQueryStageExec(_, 
ReusedExchangeExec(_, plan), _), _)
+              if plan.isInstanceOf[CometPlan] =>
+            CometExec.getByteArrayRdd(plan.asInstanceOf[CometPlan]).collect()
           case AQEShuffleReadExec(s: ShuffleQueryStageExec, _) =>
             throw new CometRuntimeException(
               "Child of CometBroadcastExchangeExec should be CometExec, " +
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
index 8bae2eca..b2e225b1 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
@@ -30,6 +30,7 @@ import org.apache.comet.CometConf
 import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
 
 class CometJoinSuite extends CometTestBase {
+  import testImplicits._
 
   override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)(implicit
       pos: Position): Unit = {
@@ -40,6 +41,17 @@ class CometJoinSuite extends CometTestBase {
     }
   }
 
+  test("join - self join") {
+    val df1 = testData.select(testData("key")).as("df1")
+    val df2 = testData.select(testData("key")).as("df2")
+
+    checkAnswer(
+      df1.join(df2, $"df1.key" === $"df2.key"),
+      sql("SELECT a.key, b.key FROM testData a JOIN testData b ON a.key = 
b.key")
+        .collect()
+        .toSeq)
+  }
+
   test("SortMergeJoin with unsupported key type should fall back to Spark") {
     withSQLConf(
       SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to