This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 44239410 fix: Unsupported window expression should fall back to Spark
(#710)
44239410 is described below
commit 44239410a38b9ff0432bee5a26ae55e1fdf354db
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Jul 24 09:56:13 2024 -0700
fix: Unsupported window expression should fall back to Spark (#710)
---
spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 6 +++++-
spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 9 +++++++++
2 files changed, 14 insertions(+), 1 deletion(-)
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 41a69f7a..d91ae5e4 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -2535,8 +2535,12 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
}
}.toArray
- val windowExprProto = winExprs.map(windowExprToProto(_, output))
+ if (winExprs.length != windowExpression.length) {
+ withInfo(op, "Unsupported window expression(s)")
+ return None
+ }
+ val windowExprProto = winExprs.map(windowExprToProto(_, output))
val partitionExprs = partitionSpec.map(exprToProto(_, child.output))
val sortOrders = orderSpec.map(exprToProto(_, child.output))
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index e657af9b..b8c0d566 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -63,6 +63,15 @@ class CometExecSuite extends CometTestBase {
}
}
+ test("Unsupported window expression should fall back to Spark") {
+ checkAnswer(
+ spark.sql("select sum(a) over () from values 1.0, 2.0, 3.0 T(a)"),
+ Row(6.0) :: Row(6.0) :: Row(6.0) :: Nil)
+ checkAnswer(
+ spark.sql("select avg(a) over () from values 1.0, 2.0, 3.0 T(a)"),
+ Row(2.0) :: Row(2.0) :: Row(2.0) :: Nil)
+ }
+
test("fix CometNativeExec.doCanonicalize for ReusedExchangeExec") {
assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark
3.4+")
withSQLConf(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]