This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 542f45b08 feat: pass ignore_nulls flag to first and last (#1866) 542f45b08 is described below commit 542f45b0893a3787e39fb049dada38fb02ffacd9 Author: Raz Luvaton <16746759+rluva...@users.noreply.github.com> AuthorDate: Mon Jun 23 21:41:23 2025 +0300 feat: pass ignore_nulls flag to first and last (#1866) --- .../main/scala/org/apache/comet/serde/aggregates.scala | 12 ++---------- .../org/apache/comet/exec/CometAggregateSuite.scala | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala index 5f41364c4..500249b55 100644 --- a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala +++ b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala @@ -232,11 +232,6 @@ object CometFirst extends CometAggregateExpressionSerde { binding: Boolean, conf: SQLConf): Option[ExprOuterClass.AggExpr] = { val first = expr.asInstanceOf[First] - if (first.ignoreNulls) { - // DataFusion doesn't support ignoreNulls true - withInfo(aggExpr, "First not supported when ignoreNulls=true") - return None - } val child = first.children.head val childExpr = exprToProto(child, inputs, binding) val dataType = serializeDataType(first.dataType) @@ -245,6 +240,7 @@ object CometFirst extends CometAggregateExpressionSerde { val builder = ExprOuterClass.First.newBuilder() builder.setChild(childExpr.get) builder.setDatatype(dataType.get) + builder.setIgnoreNulls(first.ignoreNulls) Some( ExprOuterClass.AggExpr @@ -269,11 +265,6 @@ object CometLast extends CometAggregateExpressionSerde { binding: Boolean, conf: SQLConf): Option[ExprOuterClass.AggExpr] = { val last = expr.asInstanceOf[Last] - if (last.ignoreNulls) { - // DataFusion doesn't support ignoreNulls true - withInfo(aggExpr, "Last not supported when ignoreNulls=true") - return None - } val child = last.children.head val childExpr = exprToProto(child, inputs, binding) val dataType = serializeDataType(last.dataType) @@ -282,6 +273,7 @@ object CometLast extends CometAggregateExpressionSerde { val builder = ExprOuterClass.Last.newBuilder() builder.setChild(childExpr.get) builder.setDatatype(dataType.get) + builder.setIgnoreNulls(last.ignoreNulls) Some( ExprOuterClass.AggExpr diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 644ab930c..d518196b3 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -816,6 +816,23 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("first/last with ignore null") { + val data = Range(0, 8192).flatMap(n => Seq((n, 1), (n, 2))).toDF("a", "b") + withTempDir { dir => + val filename = s"${dir.getAbsolutePath}/first_last_ignore_null.parquet" + data.write.parquet(filename) + withSQLConf(CometConf.COMET_BATCH_SIZE.key -> "100") { + spark.read.parquet(filename).createOrReplaceTempView("t1") + for (expr <- Seq("first", "last")) { + // deterministic query that should return one non-null value per group + val df = spark.sql( + s"SELECT a, $expr(IF(b==1,null,b)) IGNORE NULLS FROM t1 GROUP BY a ORDER BY a") + checkSparkAnswerAndOperator(df) + } + } + } + } + test("all types, with nulls") { val numValues = 2048 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org