This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 4da74d8 fix: incorrect result with aggregate expression with filter
(#284)
4da74d8 is described below
commit 4da74d86478191ff8ddb7ecf0ce38887461feda6
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Apr 17 20:11:28 2024 -0700
fix: incorrect result with aggregate expression with filter (#284)
---
.../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 5 +++++
.../scala/org/apache/comet/exec/CometAggregateSuite.scala | 13 +++++++++++++
2 files changed, 18 insertions(+)
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index b62c222..a0c17fc 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -1880,6 +1880,11 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde {
return None
}
+ // Aggregate expressions with filter are not supported yet.
+ if (aggregateExpressions.exists(_.filter.isDefined)) {
+ return None
+ }
+
val groupingExprs = groupingExpressions.map(exprToProto(_,
child.output))
// In some of the cases, the aggregateExpressions could be empty.
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index 09c7151..f6415cb 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -40,6 +40,19 @@ import
org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
import testImplicits._
+ test("count with aggregation filter") {
+ withSQLConf(
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ val df1 = sql("SELECT count(DISTINCT 2), count(DISTINCT 2,3)")
+ checkSparkAnswer(df1)
+
+ val df2 = sql("SELECT count(DISTINCT 2), count(DISTINCT 3,2)")
+ checkSparkAnswer(df2)
+ }
+ }
+
test("lead/lag should return the default value if the offset row does not
exist") {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",