This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 235b69d5f feat: supports array_distinct (#1923)
235b69d5f is described below

commit 235b69d5fea26eb7bfde228112ce020e603da3b0
Author: drexler-sky <evan12...@gmail.com>
AuthorDate: Thu Jun 26 06:29:37 2025 -0700

    feat: supports array_distinct (#1923)
---
 docs/source/user-guide/expressions.md              | 30 ++++++-----
 .../org/apache/comet/serde/QueryPlanSerde.scala    |  1 +
 .../main/scala/org/apache/comet/serde/arrays.scala | 17 +++++-
 .../apache/comet/CometArrayExpressionSuite.scala   | 63 ++++++++++++++++------
 4 files changed, 78 insertions(+), 33 deletions(-)

diff --git a/docs/source/user-guide/expressions.md 
b/docs/source/user-guide/expressions.md
index 7177276bd..a9b27de04 100644
--- a/docs/source/user-guide/expressions.md
+++ b/docs/source/user-guide/expressions.md
@@ -186,20 +186,22 @@ The following Spark expressions are currently available. 
Any known compatibility
 
 ## Arrays
 
-| Expression     | Notes        |
-| -------------- | ------------ |
-| ArrayAppend    | Experimental |
-| ArrayExcept    | Experimental |
-| ArrayCompact   | Experimental |
-| ArrayContains  | Experimental |
-| ArrayInsert    | Experimental |
-| ArrayIntersect | Experimental |
-| ArrayJoin      | Experimental |
-| ArrayRemove    |              |
-| ArrayRepeat    | Experimental |
-| ArraysOverlap  | Experimental |
-| ElementAt      | Arrays only  |
-| GetArrayItem   |              |
+| Expression     | Notes                                                       
                                                                           |
+|----------------|----------------------------------------------------------------------------------------------------------------------------------------|
+| ArrayAppend    | Experimental                                                
                                                                           |
+| ArrayCompact   | Experimental                                                
                                                                           |
+| ArrayContains  | Experimental                                                
                                                                           |
+| ArrayDistinct  | Experimental: behaves differently than spark. Datafusion 
first sorts then removes duplicates while spark preserves the original order. |
+| ArrayExcept    | Experimental                                                
                                                                           |
+| ArrayInsert    | Experimental                                                
                                                                           |
+| ArrayIntersect | Experimental                                                
                                                                           |
+| ArrayJoin      | Experimental                                                
                                                                           |
+| ArrayMax       | Experimental                                                
                                                                           |
+| ArrayRemove    |                                                             
                                                                           |
+| ArrayRepeat    | Experimental                                                
                                                                           |
+| ArraysOverlap  | Experimental                                                
                                                                           |
+| ElementAt      | Arrays only                                                 
                                                                           |
+| GetArrayItem   |                                                             
                                                                           |
 
 ## Structs
 
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index d0250d52a..1e3cec285 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -68,6 +68,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
   private val exprSerdeMap: Map[Class[_], CometExpressionSerde] = Map(
     classOf[ArrayAppend] -> CometArrayAppend,
     classOf[ArrayContains] -> CometArrayContains,
+    classOf[ArrayDistinct] -> CometArrayDistinct,
     classOf[ArrayExcept] -> CometArrayExcept,
     classOf[ArrayInsert] -> CometArrayInsert,
     classOf[ArrayIntersect] -> CometArrayIntersect,
diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala 
b/spark/src/main/scala/org/apache/comet/serde/arrays.scala
index 5496c0422..5cd228ff3 100644
--- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala
@@ -150,6 +150,19 @@ object CometArrayContains extends CometExpressionSerde 
with IncompatExpr {
   }
 }
 
+object CometArrayDistinct extends CometExpressionSerde with IncompatExpr {
+  override def convert(
+      expr: Expression,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val arrayExprProto = exprToProto(expr.children.head, inputs, binding)
+
+    val arrayDistinctScalarExpr =
+      scalarFunctionExprToProto("array_distinct", arrayExprProto)
+    optExprWithInfo(arrayDistinctScalarExpr, expr)
+  }
+}
+
 object CometArrayIntersect extends CometExpressionSerde with IncompatExpr {
   override def convert(
       expr: Expression,
@@ -171,9 +184,9 @@ object CometArrayMax extends CometExpressionSerde {
       binding: Boolean): Option[ExprOuterClass.Expr] = {
     val arrayExprProto = exprToProto(expr.children.head, inputs, binding)
 
-    val arrayContainsScalarExpr =
+    val arrayMaxScalarExpr =
       scalarFunctionExprToProto("array_max", arrayExprProto)
-    optExprWithInfo(arrayContainsScalarExpr, expr)
+    optExprWithInfo(arrayMaxScalarExpr, expr)
   }
 }
 
diff --git 
a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
index 0cc40c40d..ff1c260a2 100644
--- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
@@ -232,24 +232,53 @@ class CometArrayExpressionSuite extends CometTestBase 
with AdaptiveSparkPlanHelp
     }
   }
 
+  test("array_distinct") {
+    withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
+      Seq(true, false).foreach { dictionaryEnabled =>
+        withTempDir { dir =>
+          val path = new Path(dir.toURI.toString, "test.parquet")
+          makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000)
+          spark.read.parquet(path.toString).createOrReplaceTempView("t1")
+          // The result needs to be in ascending order for 
checkSparkAnswerAndOperator to pass
+          // because datafusion array_distinct sorts the elements and then 
removes the duplicates
+          checkSparkAnswerAndOperator(
+            spark.sql("SELECT array_distinct(array(_2, _2, _3, _4, _4)) FROM 
t1"))
+          checkSparkAnswerAndOperator(
+            spark.sql("SELECT array_distinct((CASE WHEN _2 =_3 THEN array(_4) 
END)) FROM t1"))
+          checkSparkAnswerAndOperator(spark.sql(
+            "SELECT array_distinct((CASE WHEN _2 =_3 THEN array(_2, _2, _4, 
_4, _5) END)) FROM t1"))
+          // NULL needs to be the first element for 
checkSparkAnswerAndOperator to pass because
+          // datafusion array_distinct sorts the elements and then removes the 
duplicates
+          checkSparkAnswerAndOperator(
+            spark.sql(
+              "SELECT array_distinct(array(CAST(NULL AS INT), _2, _2, _3, _4, 
_4)) FROM t1"))
+          checkSparkAnswerAndOperator(spark.sql(
+            "SELECT array_distinct(array(CAST(NULL AS INT), CAST(NULL AS INT), 
_2, _2, _3, _4, _4)) FROM t1"))
+        }
+      }
+    }
+  }
+
   test("array_max") {
-    withTempDir { dir =>
-      val path = new Path(dir.toURI.toString, "test.parquet")
-      makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, n = 
10000)
-      spark.read.parquet(path.toString).createOrReplaceTempView("t1");
-      checkSparkAnswerAndOperator(spark.sql("SELECT array_max(array(_2, _3, 
_4)) FROM t1"))
-      checkSparkAnswerAndOperator(
-        spark.sql("SELECT array_max((CASE WHEN _2 =_3 THEN array(_4) END)) 
FROM t1"));
-      checkSparkAnswerAndOperator(
-        spark.sql("SELECT array_max((CASE WHEN _2 =_3 THEN array(_2, _4) END)) 
FROM t1"));
-      checkSparkAnswerAndOperator(
-        spark.sql("SELECT array_max(array(CAST(NULL AS INT), CAST(NULL AS 
INT))) FROM t1"))
-      checkSparkAnswerAndOperator(
-        spark.sql("SELECT array_max(array(_2, CAST(NULL AS INT))) FROM t1"))
-      checkSparkAnswerAndOperator(spark.sql("SELECT array_max(array()) FROM 
t1"))
-      checkSparkAnswerAndOperator(
-        spark.sql(
-          "SELECT array_max(array(double('-Infinity'), 0.0, 
double('Infinity'))) FROM t1"))
+    Seq(true, false).foreach { dictionaryEnabled =>
+      withTempDir { dir =>
+        val path = new Path(dir.toURI.toString, "test.parquet")
+        makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000)
+        spark.read.parquet(path.toString).createOrReplaceTempView("t1");
+        checkSparkAnswerAndOperator(spark.sql("SELECT array_max(array(_2, _3, 
_4)) FROM t1"))
+        checkSparkAnswerAndOperator(
+          spark.sql("SELECT array_max((CASE WHEN _2 =_3 THEN array(_4) END)) 
FROM t1"))
+        checkSparkAnswerAndOperator(
+          spark.sql("SELECT array_max((CASE WHEN _2 =_3 THEN array(_2, _4) 
END)) FROM t1"))
+        checkSparkAnswerAndOperator(
+          spark.sql("SELECT array_max(array(CAST(NULL AS INT), CAST(NULL AS 
INT))) FROM t1"))
+        checkSparkAnswerAndOperator(
+          spark.sql("SELECT array_max(array(_2, CAST(NULL AS INT))) FROM t1"))
+        checkSparkAnswerAndOperator(spark.sql("SELECT array_max(array()) FROM 
t1"))
+        checkSparkAnswerAndOperator(
+          spark.sql(
+            "SELECT array_max(array(double('-Infinity'), 0.0, 
double('Infinity'))) FROM t1"))
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to