This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 44239410 fix: Unsupported window expression should fall back to Spark 
(#710)
44239410 is described below

commit 44239410a38b9ff0432bee5a26ae55e1fdf354db
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Jul 24 09:56:13 2024 -0700

    fix: Unsupported window expression should fall back to Spark (#710)
---
 spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 6 +++++-
 spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala  | 9 +++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 41a69f7a..d91ae5e4 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -2535,8 +2535,12 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
           }
         }.toArray
 
-        val windowExprProto = winExprs.map(windowExprToProto(_, output))
+        if (winExprs.length != windowExpression.length) {
+          withInfo(op, "Unsupported window expression(s)")
+          return None
+        }
 
+        val windowExprProto = winExprs.map(windowExprToProto(_, output))
         val partitionExprs = partitionSpec.map(exprToProto(_, child.output))
 
         val sortOrders = orderSpec.map(exprToProto(_, child.output))
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index e657af9b..b8c0d566 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -63,6 +63,15 @@ class CometExecSuite extends CometTestBase {
     }
   }
 
+  test("Unsupported window expression should fall back to Spark") {
+    checkAnswer(
+      spark.sql("select sum(a) over () from values 1.0, 2.0, 3.0 T(a)"),
+      Row(6.0) :: Row(6.0) :: Row(6.0) :: Nil)
+    checkAnswer(
+      spark.sql("select avg(a) over () from values 1.0, 2.0, 3.0 T(a)"),
+      Row(2.0) :: Row(2.0) :: Row(2.0) :: Nil)
+  }
+
   test("fix CometNativeExec.doCanonicalize for ReusedExchangeExec") {
     assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 
3.4+")
     withSQLConf(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to