This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 4da74d8  fix: incorrect result with aggregate expression with filter 
(#284)
4da74d8 is described below

commit 4da74d86478191ff8ddb7ecf0ce38887461feda6
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Apr 17 20:11:28 2024 -0700

    fix: incorrect result with aggregate expression with filter (#284)
---
 .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala  |  5 +++++
 .../scala/org/apache/comet/exec/CometAggregateSuite.scala   | 13 +++++++++++++
 2 files changed, 18 insertions(+)

diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index b62c222..a0c17fc 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -1880,6 +1880,11 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
           return None
         }
 
+        // Aggregate expressions with filter are not supported yet.
+        if (aggregateExpressions.exists(_.filter.isDefined)) {
+          return None
+        }
+
         val groupingExprs = groupingExpressions.map(exprToProto(_, 
child.output))
 
         // In some of the cases, the aggregateExpressions could be empty.
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index 09c7151..f6415cb 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -40,6 +40,19 @@ import 
org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
 class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
   import testImplicits._
 
+  test("count with aggregation filter") {
+    withSQLConf(
+      CometConf.COMET_ENABLED.key -> "true",
+      CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
+      CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+      val df1 = sql("SELECT count(DISTINCT 2), count(DISTINCT 2,3)")
+      checkSparkAnswer(df1)
+
+      val df2 = sql("SELECT count(DISTINCT 2), count(DISTINCT 3,2)")
+      checkSparkAnswer(df2)
+    }
+  }
+
   test("lead/lag should return the default value if the offset row does not 
exist") {
     withSQLConf(
       CometConf.COMET_ENABLED.key -> "true",

Reply via email to