This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 235b69d5f feat: supports array_distinct (#1923) 235b69d5f is described below commit 235b69d5fea26eb7bfde228112ce020e603da3b0 Author: drexler-sky <evan12...@gmail.com> AuthorDate: Thu Jun 26 06:29:37 2025 -0700 feat: supports array_distinct (#1923) --- docs/source/user-guide/expressions.md | 30 ++++++----- .../org/apache/comet/serde/QueryPlanSerde.scala | 1 + .../main/scala/org/apache/comet/serde/arrays.scala | 17 +++++- .../apache/comet/CometArrayExpressionSuite.scala | 63 ++++++++++++++++------ 4 files changed, 78 insertions(+), 33 deletions(-) diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index 7177276bd..a9b27de04 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -186,20 +186,22 @@ The following Spark expressions are currently available. Any known compatibility ## Arrays -| Expression | Notes | -| -------------- | ------------ | -| ArrayAppend | Experimental | -| ArrayExcept | Experimental | -| ArrayCompact | Experimental | -| ArrayContains | Experimental | -| ArrayInsert | Experimental | -| ArrayIntersect | Experimental | -| ArrayJoin | Experimental | -| ArrayRemove | | -| ArrayRepeat | Experimental | -| ArraysOverlap | Experimental | -| ElementAt | Arrays only | -| GetArrayItem | | +| Expression | Notes | +|----------------|----------------------------------------------------------------------------------------------------------------------------------------| +| ArrayAppend | Experimental | +| ArrayCompact | Experimental | +| ArrayContains | Experimental | +| ArrayDistinct | Experimental: behaves differently than spark. Datafusion first sorts then removes duplicates while spark preserves the original order. | +| ArrayExcept | Experimental | +| ArrayInsert | Experimental | +| ArrayIntersect | Experimental | +| ArrayJoin | Experimental | +| ArrayMax | Experimental | +| ArrayRemove | | +| ArrayRepeat | Experimental | +| ArraysOverlap | Experimental | +| ElementAt | Arrays only | +| GetArrayItem | | ## Structs diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index d0250d52a..1e3cec285 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -68,6 +68,7 @@ object QueryPlanSerde extends Logging with CometExprShim { private val exprSerdeMap: Map[Class[_], CometExpressionSerde] = Map( classOf[ArrayAppend] -> CometArrayAppend, classOf[ArrayContains] -> CometArrayContains, + classOf[ArrayDistinct] -> CometArrayDistinct, classOf[ArrayExcept] -> CometArrayExcept, classOf[ArrayInsert] -> CometArrayInsert, classOf[ArrayIntersect] -> CometArrayIntersect, diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 5496c0422..5cd228ff3 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -150,6 +150,19 @@ object CometArrayContains extends CometExpressionSerde with IncompatExpr { } } +object CometArrayDistinct extends CometExpressionSerde with IncompatExpr { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val arrayExprProto = exprToProto(expr.children.head, inputs, binding) + + val arrayDistinctScalarExpr = + scalarFunctionExprToProto("array_distinct", arrayExprProto) + optExprWithInfo(arrayDistinctScalarExpr, expr) + } +} + object CometArrayIntersect extends CometExpressionSerde with IncompatExpr { override def convert( expr: Expression, @@ -171,9 +184,9 @@ object CometArrayMax extends CometExpressionSerde { binding: Boolean): Option[ExprOuterClass.Expr] = { val arrayExprProto = exprToProto(expr.children.head, inputs, binding) - val arrayContainsScalarExpr = + val arrayMaxScalarExpr = scalarFunctionExprToProto("array_max", arrayExprProto) - optExprWithInfo(arrayContainsScalarExpr, expr) + optExprWithInfo(arrayMaxScalarExpr, expr) } } diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 0cc40c40d..ff1c260a2 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -232,24 +232,53 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } } + test("array_distinct") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + // The result needs to be in ascending order for checkSparkAnswerAndOperator to pass + // because datafusion array_distinct sorts the elements and then removes the duplicates + checkSparkAnswerAndOperator( + spark.sql("SELECT array_distinct(array(_2, _2, _3, _4, _4)) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_distinct((CASE WHEN _2 =_3 THEN array(_4) END)) FROM t1")) + checkSparkAnswerAndOperator(spark.sql( + "SELECT array_distinct((CASE WHEN _2 =_3 THEN array(_2, _2, _4, _4, _5) END)) FROM t1")) + // NULL needs to be the first element for checkSparkAnswerAndOperator to pass because + // datafusion array_distinct sorts the elements and then removes the duplicates + checkSparkAnswerAndOperator( + spark.sql( + "SELECT array_distinct(array(CAST(NULL AS INT), _2, _2, _3, _4, _4)) FROM t1")) + checkSparkAnswerAndOperator(spark.sql( + "SELECT array_distinct(array(CAST(NULL AS INT), CAST(NULL AS INT), _2, _2, _3, _4, _4)) FROM t1")) + } + } + } + } + test("array_max") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, n = 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1"); - checkSparkAnswerAndOperator(spark.sql("SELECT array_max(array(_2, _3, _4)) FROM t1")) - checkSparkAnswerAndOperator( - spark.sql("SELECT array_max((CASE WHEN _2 =_3 THEN array(_4) END)) FROM t1")); - checkSparkAnswerAndOperator( - spark.sql("SELECT array_max((CASE WHEN _2 =_3 THEN array(_2, _4) END)) FROM t1")); - checkSparkAnswerAndOperator( - spark.sql("SELECT array_max(array(CAST(NULL AS INT), CAST(NULL AS INT))) FROM t1")) - checkSparkAnswerAndOperator( - spark.sql("SELECT array_max(array(_2, CAST(NULL AS INT))) FROM t1")) - checkSparkAnswerAndOperator(spark.sql("SELECT array_max(array()) FROM t1")) - checkSparkAnswerAndOperator( - spark.sql( - "SELECT array_max(array(double('-Infinity'), 0.0, double('Infinity'))) FROM t1")) + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1"); + checkSparkAnswerAndOperator(spark.sql("SELECT array_max(array(_2, _3, _4)) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_max((CASE WHEN _2 =_3 THEN array(_4) END)) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_max((CASE WHEN _2 =_3 THEN array(_2, _4) END)) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_max(array(CAST(NULL AS INT), CAST(NULL AS INT))) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_max(array(_2, CAST(NULL AS INT))) FROM t1")) + checkSparkAnswerAndOperator(spark.sql("SELECT array_max(array()) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql( + "SELECT array_max(array(double('-Infinity'), 0.0, double('Infinity'))) FROM t1")) + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org