This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 7ba69d8 fix: Comet native operator after ReusedExchange (#187)
7ba69d8 is described below
commit 7ba69d84fe8bbb50dc1050a94d860a48202bbf44
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon Mar 11 09:47:30 2024 -0700
fix: Comet native operator after ReusedExchange (#187)
---
.../org/apache/spark/sql/comet/operators.scala | 3 ++-
.../comet/exec/CometColumnarShuffleSuite.scala | 28 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index e75f9a4..5551ffd 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -35,6 +35,7 @@ import
org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.comet.execution.shuffle.{ArrowReaderIterator,
CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{ColumnarToRowExec,
ExecSubqueryExpression, ExplainUtils, LeafExecNode, ScalarSubquery, SparkPlan,
UnaryExecNode}
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec,
ShuffleQueryStageExec}
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -266,7 +267,7 @@ abstract class CometNativeExec extends CometExec {
plan match {
case _: CometScanExec | _: CometBatchScanExec | _: ShuffleQueryStageExec
|
_: AQEShuffleReadExec | _: CometShuffleExchangeExec | _:
CometUnionExec |
- _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec =>
+ _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | _:
ReusedExchangeExec =>
func(plan)
case _: CometPlan =>
// Other Comet operators, continue to traverse the tree.
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
index a9b29e6..1a92f71 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
@@ -27,6 +27,8 @@ import org.apache.spark.{Partitioner, SparkConf}
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleDependency,
CometShuffleExchangeExec, CometShuffleManager}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -937,6 +939,32 @@ class DisableAQECometShuffleSuite extends
CometColumnarShuffleSuite {
override protected val asyncShuffleEnable: Boolean = false
protected val adaptiveExecutionEnabled: Boolean = false
+
+ import testImplicits._
+
+ test("Comet native operator after ReusedExchange") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") {
+ withParquetTable((0 until 10).map(i => (i % 10, i + 2)), "tbl_b") {
+ val df = sql("SELECT * FROM tbl_a")
+ val left = df
+ .select($"_1" + 1 as ("a"))
+ .filter($"a" > 4)
+ val right = left.select($"a" as ("b"))
+ val join = left.join(right, $"a" === $"b")
+ checkSparkAnswerAndOperator(
+ join,
+ classOf[ReusedExchangeExec],
+ classOf[SortMergeJoinExec])
+ }
+ }
+ }
+ }
}
class DisableAQECometAsyncShuffleSuite extends CometColumnarShuffleSuite {