This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 07274e800 Support arrays_overlap function (#1312) 07274e800 is described below commit 07274e8005d9042441bd27378bc381bf95dad4af Author: Eren Avsarogullari <erenavsarogull...@gmail.com> AuthorDate: Tue Jan 28 15:53:10 2025 -0800 Support arrays_overlap function (#1312) --- native/core/src/execution/planner.rs | 16 ++++++++++++++++ native/proto/src/proto/expr.proto | 1 + .../org/apache/comet/serde/QueryPlanSerde.scala | 16 ++++++++++++++++ .../org/apache/comet/CometExpressionSuite.scala | 21 +++++++++++++++++++++ 4 files changed, 54 insertions(+) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 95926bfee..71ec5f334 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -65,6 +65,7 @@ use datafusion::{ prelude::SessionContext, }; use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr}; +use datafusion_functions_nested::array_has::array_has_any_udf; use datafusion_functions_nested::concat::ArrayAppend; use datafusion_functions_nested::remove::array_remove_all_udf; use datafusion_functions_nested::set_ops::array_intersect_udf; @@ -818,6 +819,21 @@ impl PhysicalPlanner { )); Ok(array_join_expr) } + ExprStruct::ArraysOverlap(expr) => { + let left_array_expr = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right_array_expr = + self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?; + let args = vec![Arc::clone(&left_array_expr), right_array_expr]; + let datafusion_array_has_any = array_has_any_udf(); + let array_has_any_expr = Arc::new(ScalarFunctionExpr::new( + "array_has_any", + datafusion_array_has_any, + args, + DataType::Boolean, + )); + Ok(array_has_any_expr) + } expr => Err(ExecutionError::GeneralError(format!( "Not implemented: {:?}", expr diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 83d6da7cb..fd928fd8a 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -88,6 +88,7 @@ message Expr { BinaryExpr array_remove = 61; BinaryExpr array_intersect = 62; ArrayJoin array_join = 63; + BinaryExpr arrays_overlap = 64; } } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index c3d7ac749..cb4fffc1a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2428,6 +2428,22 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim withInfo(expr, "unsupported arguments for ArrayJoin", exprs: _*) None } + case ArraysOverlap(leftArrayExpr, rightArrayExpr) => + if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) { + createBinaryExpr( + expr, + leftArrayExpr, + rightArrayExpr, + inputs, + binding, + (builder, binaryExpr) => builder.setArraysOverlap(binaryExpr)) + } else { + withInfo( + expr, + s"$expr is not supported yet. To enable all incompatible casts, set " + + s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true") + None + } case _ => withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*) None diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 99cf4bad4..f82101b3a 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2701,4 +2701,25 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } } + + test("arrays_overlap") { + withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled, 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array(_2, _3, _4), array(_3, _4)) from t1 where _2 is not null")) + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array('a', null, cast(_1 as string)), array('b', cast(_1 as string), cast(_2 as string))) from t1 where _1 is not null")) + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array('a', null), array('b', null)) from t1 where _1 is not null")) + checkSparkAnswerAndOperator(spark.sql( + "SELECT arrays_overlap((CASE WHEN _2 =_3 THEN array(_6, _7) END), array(_6, _7)) FROM t1")); + } + } + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org