This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new e52cfb4d fix: ReusedExchangeExec can be child operator of
CometBroadcastExchangeExec (#713)
e52cfb4d is described below
commit e52cfb4da7e80ef4fea233f06f91815145ff63e8
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Jul 24 12:04:32 2024 -0700
fix: ReusedExchangeExec can be child operator of CometBroadcastExchangeExec
(#713)
---
.../apache/spark/sql/comet/CometBroadcastExchangeExec.scala | 7 ++++++-
.../test/scala/org/apache/comet/exec/CometJoinSuite.scala | 12 ++++++++++++
2 files changed, 18 insertions(+), 1 deletion(-)
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
index 64d14a00..0f698d8a 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan,
SQLExecution}
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec,
ShuffleQueryStageExec}
-import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike,
ReusedExchangeExec}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -129,6 +129,11 @@ case class CometBroadcastExchangeExec(
case AQEShuffleReadExec(s: ShuffleQueryStageExec, _)
if s.plan.isInstanceOf[CometPlan] =>
CometExec.getByteArrayRdd(s.plan.asInstanceOf[CometPlan]).collect()
+ case ReusedExchangeExec(_, plan) if plan.isInstanceOf[CometPlan] =>
+ CometExec.getByteArrayRdd(plan.asInstanceOf[CometPlan]).collect()
+ case AQEShuffleReadExec(ShuffleQueryStageExec(_,
ReusedExchangeExec(_, plan), _), _)
+ if plan.isInstanceOf[CometPlan] =>
+ CometExec.getByteArrayRdd(plan.asInstanceOf[CometPlan]).collect()
case AQEShuffleReadExec(s: ShuffleQueryStageExec, _) =>
throw new CometRuntimeException(
"Child of CometBroadcastExchangeExec should be CometExec, " +
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
index 8bae2eca..b2e225b1 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
@@ -30,6 +30,7 @@ import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
class CometJoinSuite extends CometTestBase {
+ import testImplicits._
override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
pos: Position): Unit = {
@@ -40,6 +41,17 @@ class CometJoinSuite extends CometTestBase {
}
}
+ test("join - self join") {
+ val df1 = testData.select(testData("key")).as("df1")
+ val df2 = testData.select(testData("key")).as("df2")
+
+ checkAnswer(
+ df1.join(df2, $"df1.key" === $"df2.key"),
+ sql("SELECT a.key, b.key FROM testData a JOIN testData b ON a.key =
b.key")
+ .collect()
+ .toSeq)
+ }
+
test("SortMergeJoin with unsupported key type should fall back to Spark") {
withSQLConf(
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]