This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 235b69d5f feat: supports array_distinct (#1923)
235b69d5f is described below
commit 235b69d5fea26eb7bfde228112ce020e603da3b0
Author: drexler-sky <[email protected]>
AuthorDate: Thu Jun 26 06:29:37 2025 -0700
feat: supports array_distinct (#1923)
---
docs/source/user-guide/expressions.md | 30 ++++++-----
.../org/apache/comet/serde/QueryPlanSerde.scala | 1 +
.../main/scala/org/apache/comet/serde/arrays.scala | 17 +++++-
.../apache/comet/CometArrayExpressionSuite.scala | 63 ++++++++++++++++------
4 files changed, 78 insertions(+), 33 deletions(-)
diff --git a/docs/source/user-guide/expressions.md
b/docs/source/user-guide/expressions.md
index 7177276bd..a9b27de04 100644
--- a/docs/source/user-guide/expressions.md
+++ b/docs/source/user-guide/expressions.md
@@ -186,20 +186,22 @@ The following Spark expressions are currently available.
Any known compatibility
## Arrays
-| Expression | Notes |
-| -------------- | ------------ |
-| ArrayAppend | Experimental |
-| ArrayExcept | Experimental |
-| ArrayCompact | Experimental |
-| ArrayContains | Experimental |
-| ArrayInsert | Experimental |
-| ArrayIntersect | Experimental |
-| ArrayJoin | Experimental |
-| ArrayRemove | |
-| ArrayRepeat | Experimental |
-| ArraysOverlap | Experimental |
-| ElementAt | Arrays only |
-| GetArrayItem | |
+| Expression | Notes
|
+|----------------|----------------------------------------------------------------------------------------------------------------------------------------|
+| ArrayAppend | Experimental
|
+| ArrayCompact | Experimental
|
+| ArrayContains | Experimental
|
+| ArrayDistinct | Experimental: behaves differently than spark. Datafusion
first sorts then removes duplicates while spark preserves the original order. |
+| ArrayExcept | Experimental
|
+| ArrayInsert | Experimental
|
+| ArrayIntersect | Experimental
|
+| ArrayJoin | Experimental
|
+| ArrayMax | Experimental
|
+| ArrayRemove |
|
+| ArrayRepeat | Experimental
|
+| ArraysOverlap | Experimental
|
+| ElementAt | Arrays only
|
+| GetArrayItem |
|
## Structs
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index d0250d52a..1e3cec285 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -68,6 +68,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
private val exprSerdeMap: Map[Class[_], CometExpressionSerde] = Map(
classOf[ArrayAppend] -> CometArrayAppend,
classOf[ArrayContains] -> CometArrayContains,
+ classOf[ArrayDistinct] -> CometArrayDistinct,
classOf[ArrayExcept] -> CometArrayExcept,
classOf[ArrayInsert] -> CometArrayInsert,
classOf[ArrayIntersect] -> CometArrayIntersect,
diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala
b/spark/src/main/scala/org/apache/comet/serde/arrays.scala
index 5496c0422..5cd228ff3 100644
--- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala
@@ -150,6 +150,19 @@ object CometArrayContains extends CometExpressionSerde
with IncompatExpr {
}
}
+object CometArrayDistinct extends CometExpressionSerde with IncompatExpr {
+ override def convert(
+ expr: Expression,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[ExprOuterClass.Expr] = {
+ val arrayExprProto = exprToProto(expr.children.head, inputs, binding)
+
+ val arrayDistinctScalarExpr =
+ scalarFunctionExprToProto("array_distinct", arrayExprProto)
+ optExprWithInfo(arrayDistinctScalarExpr, expr)
+ }
+}
+
object CometArrayIntersect extends CometExpressionSerde with IncompatExpr {
override def convert(
expr: Expression,
@@ -171,9 +184,9 @@ object CometArrayMax extends CometExpressionSerde {
binding: Boolean): Option[ExprOuterClass.Expr] = {
val arrayExprProto = exprToProto(expr.children.head, inputs, binding)
- val arrayContainsScalarExpr =
+ val arrayMaxScalarExpr =
scalarFunctionExprToProto("array_max", arrayExprProto)
- optExprWithInfo(arrayContainsScalarExpr, expr)
+ optExprWithInfo(arrayMaxScalarExpr, expr)
}
}
diff --git
a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
index 0cc40c40d..ff1c260a2 100644
--- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
@@ -232,24 +232,53 @@ class CometArrayExpressionSuite extends CometTestBase
with AdaptiveSparkPlanHelp
}
}
+ test("array_distinct") {
+ withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
+ Seq(true, false).foreach { dictionaryEnabled =>
+ withTempDir { dir =>
+ val path = new Path(dir.toURI.toString, "test.parquet")
+ makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000)
+ spark.read.parquet(path.toString).createOrReplaceTempView("t1")
+ // The result needs to be in ascending order for
checkSparkAnswerAndOperator to pass
+ // because datafusion array_distinct sorts the elements and then
removes the duplicates
+ checkSparkAnswerAndOperator(
+ spark.sql("SELECT array_distinct(array(_2, _2, _3, _4, _4)) FROM
t1"))
+ checkSparkAnswerAndOperator(
+ spark.sql("SELECT array_distinct((CASE WHEN _2 =_3 THEN array(_4)
END)) FROM t1"))
+ checkSparkAnswerAndOperator(spark.sql(
+ "SELECT array_distinct((CASE WHEN _2 =_3 THEN array(_2, _2, _4,
_4, _5) END)) FROM t1"))
+ // NULL needs to be the first element for
checkSparkAnswerAndOperator to pass because
+ // datafusion array_distinct sorts the elements and then removes the
duplicates
+ checkSparkAnswerAndOperator(
+ spark.sql(
+ "SELECT array_distinct(array(CAST(NULL AS INT), _2, _2, _3, _4,
_4)) FROM t1"))
+ checkSparkAnswerAndOperator(spark.sql(
+ "SELECT array_distinct(array(CAST(NULL AS INT), CAST(NULL AS INT),
_2, _2, _3, _4, _4)) FROM t1"))
+ }
+ }
+ }
+ }
+
test("array_max") {
- withTempDir { dir =>
- val path = new Path(dir.toURI.toString, "test.parquet")
- makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, n =
10000)
- spark.read.parquet(path.toString).createOrReplaceTempView("t1");
- checkSparkAnswerAndOperator(spark.sql("SELECT array_max(array(_2, _3,
_4)) FROM t1"))
- checkSparkAnswerAndOperator(
- spark.sql("SELECT array_max((CASE WHEN _2 =_3 THEN array(_4) END))
FROM t1"));
- checkSparkAnswerAndOperator(
- spark.sql("SELECT array_max((CASE WHEN _2 =_3 THEN array(_2, _4) END))
FROM t1"));
- checkSparkAnswerAndOperator(
- spark.sql("SELECT array_max(array(CAST(NULL AS INT), CAST(NULL AS
INT))) FROM t1"))
- checkSparkAnswerAndOperator(
- spark.sql("SELECT array_max(array(_2, CAST(NULL AS INT))) FROM t1"))
- checkSparkAnswerAndOperator(spark.sql("SELECT array_max(array()) FROM
t1"))
- checkSparkAnswerAndOperator(
- spark.sql(
- "SELECT array_max(array(double('-Infinity'), 0.0,
double('Infinity'))) FROM t1"))
+ Seq(true, false).foreach { dictionaryEnabled =>
+ withTempDir { dir =>
+ val path = new Path(dir.toURI.toString, "test.parquet")
+ makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000)
+ spark.read.parquet(path.toString).createOrReplaceTempView("t1");
+ checkSparkAnswerAndOperator(spark.sql("SELECT array_max(array(_2, _3,
_4)) FROM t1"))
+ checkSparkAnswerAndOperator(
+ spark.sql("SELECT array_max((CASE WHEN _2 =_3 THEN array(_4) END))
FROM t1"))
+ checkSparkAnswerAndOperator(
+ spark.sql("SELECT array_max((CASE WHEN _2 =_3 THEN array(_2, _4)
END)) FROM t1"))
+ checkSparkAnswerAndOperator(
+ spark.sql("SELECT array_max(array(CAST(NULL AS INT), CAST(NULL AS
INT))) FROM t1"))
+ checkSparkAnswerAndOperator(
+ spark.sql("SELECT array_max(array(_2, CAST(NULL AS INT))) FROM t1"))
+ checkSparkAnswerAndOperator(spark.sql("SELECT array_max(array()) FROM
t1"))
+ checkSparkAnswerAndOperator(
+ spark.sql(
+ "SELECT array_max(array(double('-Infinity'), 0.0,
double('Infinity'))) FROM t1"))
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]