This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 2f58fac38 feat: Supports array_union (#1945) 2f58fac38 is described below commit 2f58fac38539f19e5047d2bd2bfd20d70b5532d2 Author: drexler-sky <evan12...@gmail.com> AuthorDate: Fri Jun 27 18:01:10 2025 -0700 feat: Supports array_union (#1945) --- docs/source/user-guide/expressions.md | 33 +++++++++++----------- docs/spark_expressions_support.md | 26 ++++++++--------- .../org/apache/comet/serde/QueryPlanSerde.scala | 1 + .../main/scala/org/apache/comet/serde/arrays.scala | 14 +++++++++ .../apache/comet/CometArrayExpressionSuite.scala | 19 +++++++++++++ 5 files changed, 64 insertions(+), 29 deletions(-) diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index a9b27de04..108fa9846 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -186,22 +186,23 @@ The following Spark expressions are currently available. Any known compatibility ## Arrays -| Expression | Notes | -|----------------|----------------------------------------------------------------------------------------------------------------------------------------| -| ArrayAppend | Experimental | -| ArrayCompact | Experimental | -| ArrayContains | Experimental | -| ArrayDistinct | Experimental: behaves differently than spark. Datafusion first sorts then removes duplicates while spark preserves the original order. | -| ArrayExcept | Experimental | -| ArrayInsert | Experimental | -| ArrayIntersect | Experimental | -| ArrayJoin | Experimental | -| ArrayMax | Experimental | -| ArrayRemove | | -| ArrayRepeat | Experimental | -| ArraysOverlap | Experimental | -| ElementAt | Arrays only | -| GetArrayItem | | +| Expression | Notes | +|----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| ArrayAppend | Experimental | +| ArrayCompact | Experimental | +| ArrayContains | Experimental | +| ArrayDistinct | Experimental: behaves differently than spark. Datafusion first sorts then removes duplicates while spark preserves the original order. | +| ArrayExcept | Experimental | +| ArrayInsert | Experimental | +| ArrayIntersect | Experimental | +| ArrayJoin | Experimental | +| ArrayMax | Experimental | +| ArrayRemove | | +| ArrayRepeat | Experimental | +| ArraysOverlap | Experimental | +| ArrayUnion | Experimental: behaves differently than spark. Datafusion sorts the input arrays before performing the union, while spark preserves the order of the first array and appends unique elements from the second. | +| ElementAt | Arrays only | +| GetArrayItem | | ## Structs diff --git a/docs/spark_expressions_support.md b/docs/spark_expressions_support.md index 297b15d2c..9d4a15c8e 100644 --- a/docs/spark_expressions_support.md +++ b/docs/spark_expressions_support.md @@ -81,21 +81,21 @@ ### array_funcs - [x] array - - [ ] array_append - - [ ] array_compact - - [ ] array_contains - - [ ] array_distinct - - [ ] array_except - - [ ] array_insert - - [ ] array_intersect - - [ ] array_join - - [ ] array_max + - [x] array_append + - [x] array_compact + - [x] array_contains + - [x] array_distinct + - [x] array_except + - [x] array_insert + - [x] array_intersect + - [x] array_join + - [x] array_max - [ ] array_min - [ ] array_position - - [ ] array_remove - - [ ] array_repeat - - [ ] array_union - - [ ] arrays_overlap + - [x] array_remove + - [x] array_repeat + - [x] array_union + - [x] arrays_overlap - [ ] arrays_zip - [x] element_at - [ ] flatten diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e445cbb7c..e0fadc314 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -78,6 +78,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[ArrayRemove] -> CometArrayRemove, classOf[ArrayRepeat] -> CometArrayRepeat, classOf[ArraysOverlap] -> CometArraysOverlap, + classOf[ArrayUnion] -> CometArrayUnion, classOf[Ascii] -> CometAscii, classOf[ConcatWs] -> CometConcatWs, classOf[Chr] -> CometChr, diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 5cd228ff3..d8a7eeb15 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -347,3 +347,17 @@ object CometArrayInsert extends CometExpressionSerde with IncompatExpr { } } } + +object CometArrayUnion extends CometExpressionSerde with IncompatExpr { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val leftArrayExprProto = exprToProto(expr.children.head, inputs, binding) + val rightArrayExprProto = exprToProto(expr.children(1), inputs, binding) + + val arraysUnionScalarExpr = + scalarFunctionExprToProto("array_union", leftArrayExprProto, rightArrayExprProto) + optExprWithInfo(arraysUnionScalarExpr, expr, expr.children: _*) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index ff1c260a2..f3786d657 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -259,6 +259,25 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } } + test("array_union") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator( + spark.sql("SELECT array_union(array(_2, _3, _4), array(_3, _4)) FROM t1")) + checkSparkAnswerAndOperator(sql("SELECT array_union(array(_18), array(_19)) from t1")) + checkSparkAnswerAndOperator(spark.sql( + "SELECT array_union(array(CAST(NULL AS INT), _2, _3, _4), array(CAST(NULL AS INT), _2, _3)) FROM t1")) + checkSparkAnswerAndOperator(spark.sql( + "SELECT array_union(array(CAST(NULL AS INT), CAST(NULL AS INT), _2, _3, _4), array(CAST(NULL AS INT), CAST(NULL AS INT), _2, _3)) FROM t1")) + } + } + } + } + test("array_max") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org