This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f8ce757 feat: add support for array_contains expression (#1163)
4f8ce757 is described below

commit 4f8ce757b2832730e3d6974848cf5d3d89072283
Author: Dharan Aditya <[email protected]>
AuthorDate: Fri Jan 3 01:13:02 2025 +0530

    feat: add support for array_contains expression (#1163)
    
    * feat: add support for array_contains expression
    
    * test: add unit test for array_contains function
    
    * Removes unnecessary case expression for handling null values
    
    * chore: Move more expressions from core crate to spark-expr crate (#1152)
    
    * move aggregate expressions to spark-expr crate
    
    * move more expressions
    
    * move benchmark
    
    * normalize_nan
    
    * bitwise not
    
    * comet scalar funcs
    
    * update bench imports
    
    * remove dead code (#1155)
    
    * fix: Spark 4.0-preview1 SPARK-47120 (#1156)
    
    ## Which issue does this PR close?
    
    Part of https://github.com/apache/datafusion-comet/issues/372 and 
https://github.com/apache/datafusion-comet/issues/551
    
    ## Rationale for this change
    
    To be ready for Spark 4.0
    
    ## What changes are included in this PR?
    
    This PR fixes the new test SPARK-47120 added in Spark 4.0
    
    ## How are these changes tested?
    
    tests enabled
    
    * chore: Move string kernels and expressions to spark-expr crate (#1164)
    
    * Move string kernels and expressions to spark-expr crate
    
    * remove unused hash kernel
    
    * remove unused dependencies
    
    * chore: Move remaining expressions to spark-expr crate + some minor 
refactoring (#1165)
    
    * move CheckOverflow to spark-expr crate
    
    * move NegativeExpr to spark-expr crate
    
    * move UnboundColumn to spark-expr crate
    
    * move ExpandExec from execution::datafusion::operators to 
execution::operators
    
    * refactoring to remove datafusion subpackage
    
    * update imports in benches
    
    * fix
    
    * fix
    
    * chore: Add ignored tests for reading complex types from Parquet (#1167)
    
    * Add ignored tests for reading structs from Parquet
    
    * add basic map test
    
    * add tests for Map and Array
    
    * feat: Add Spark-compatible implementation of SchemaAdapterFactory (#1169)
    
    * Add Spark-compatible SchemaAdapterFactory implementation
    
    * remove prototype code
    
    * fix
    
    * refactor
    
    * implement more cast logic
    
    * implement more cast logic
    
    * add basic test
    
    * improve test
    
    * cleanup
    
    * fmt
    
    * add support for casting unsigned int to signed int
    
    * clippy
    
    * address feedback
    
    * fix test
    
    * fix: Document enabling comet explain plan usage in Spark (4.0) (#1176)
    
    * test: enabling Spark tests with offHeap requirement (#1177)
    
    ## Which issue does this PR close?
    
    ## Rationale for this change
    
    After https://github.com/apache/datafusion-comet/pull/1062 We have not 
running Spark tests for native execution
    
    ## What changes are included in this PR?
    
    Removed the off heap requirement for testing
    
    ## How are these changes tested?
    
    Bringing back Spark tests for native execution
    
    * feat: Improve shuffle metrics (second attempt) (#1175)
    
    * improve shuffle metrics
    
    * docs
    
    * more metrics
    
    * refactor
    
    * address feedback
    
    * fix: stddev_pop should not directly return 0.0 when count is 1.0 (#1184)
    
    * add test
    
    * fix
    
    * fix
    
    * fix
    
    * feat: Make native shuffle compression configurable and respect 
`spark.shuffle.compress` (#1185)
    
    * Make shuffle compression codec and level configurable
    
    * remove lz4 references
    
    * docs
    
    * update comment
    
    * clippy
    
    * fix benches
    
    * clippy
    
    * clippy
    
    * disable test for miri
    
    * remove lz4 reference from proto
    
    * minor: move shuffle classes from common to spark (#1193)
    
    * minor: refactor decodeBatches to make private in broadcast exchange 
(#1195)
    
    * minor: refactor prepare_output so that it does not require an 
ExecutionContext (#1194)
    
    * fix: fix missing explanation for then branch in case when (#1200)
    
    * minor: remove unused source files (#1202)
    
    * chore: Upgrade to DataFusion 44.0.0-rc2 (#1154)
    
    * move aggregate expressions to spark-expr crate
    
    * move more expressions
    
    * move benchmark
    
    * normalize_nan
    
    * bitwise not
    
    * comet scalar funcs
    
    * update bench imports
    
    * save
    
    * save
    
    * save
    
    * remove unused imports
    
    * clippy
    
    * implement more hashers
    
    * implement Hash and PartialEq
    
    * implement Hash and PartialEq
    
    * implement Hash and PartialEq
    
    * benches
    
    * fix ScalarUDFImpl.return_type failure
    
    * exclude test from miri
    
    * ignore correct test
    
    * ignore another test
    
    * remove miri checks
    
    * use return_type_from_exprs
    
    * Revert "use return_type_from_exprs"
    
    This reverts commit febc1f1ec1301f9b359fc23ad6a117224fce35b7.
    
    * use DF main branch
    
    * hacky workaround for regression in ScalarUDFImpl.return_type
    
    * fix repo url
    
    * pin to revision
    
    * bump to latest rev
    
    * bump to latest DF rev
    
    * bump DF to rev 9f530dd
    
    * add Cargo.lock
    
    * bump DF version
    
    * no default features
    
    * Revert "remove miri checks"
    
    This reverts commit 4638fe3aa5501966cd5d8b53acf26c698b10b3c9.
    
    * Update pin to DataFusion e99e02b9b9093ceb0c13a2dd32a2a89beba47930
    
    * update pin
    
    * Update Cargo.toml
    
    Bump to 44.0.0-rc2
    
    * update cargo lock
    
    * revert miri change
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * update UT
    
    Signed-off-by: Dharan Aditya <[email protected]>
    
    * fix typo in UT
    
    Signed-off-by: Dharan Aditya <[email protected]>
    
    ---------
    
    Signed-off-by: Dharan Aditya <[email protected]>
    Co-authored-by: Andy Grove <[email protected]>
    Co-authored-by: KAZUYUKI TANIMURA <[email protected]>
    Co-authored-by: Parth Chandra <[email protected]>
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    Co-authored-by: Raz Luvaton <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 native/core/src/execution/planner.rs                      | 15 +++++++++++++++
 native/proto/src/proto/expr.proto                         |  1 +
 .../scala/org/apache/comet/serde/QueryPlanSerde.scala     |  6 ++++++
 .../scala/org/apache/comet/CometExpressionSuite.scala     | 12 ++++++++++++
 4 files changed, 34 insertions(+)

diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 5a35c62e..c40e2e73 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -98,6 +98,7 @@ use datafusion_expr::{
     AggregateUDF, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits,
     WindowFunctionDefinition,
 };
+use datafusion_functions_nested::array_has::ArrayHas;
 use datafusion_physical_expr::expressions::{Literal, StatsType};
 use datafusion_physical_expr::window::WindowExpr;
 use datafusion_physical_expr::LexOrdering;
@@ -719,6 +720,20 @@ impl PhysicalPlanner {
                     expr.legacy_negative_index,
                 )))
             }
+            ExprStruct::ArrayContains(expr) => {
+                let src_array_expr =
+                    self.create_expr(expr.left.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
+                let key_expr =
+                    self.create_expr(expr.right.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
+                let args = vec![Arc::clone(&src_array_expr), key_expr];
+                let array_has_expr = Arc::new(ScalarFunctionExpr::new(
+                    "array_has",
+                    Arc::new(ScalarUDF::new_from_impl(ArrayHas::new())),
+                    args,
+                    DataType::Boolean,
+                ));
+                Ok(array_has_expr)
+            }
             expr => Err(ExecutionError::GeneralError(format!(
                 "Not implemented: {:?}",
                 expr
diff --git a/native/proto/src/proto/expr.proto 
b/native/proto/src/proto/expr.proto
index 7a8ea78d..e76ecdcc 100644
--- a/native/proto/src/proto/expr.proto
+++ b/native/proto/src/proto/expr.proto
@@ -84,6 +84,7 @@ message Expr {
     GetArrayStructFields get_array_struct_fields = 57;
     BinaryExpr array_append = 58;
     ArrayInsert array_insert = 59;
+    BinaryExpr array_contains = 60;
   }
 }
 
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 518fa068..dc081b19 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -2266,6 +2266,12 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
             withInfo(expr, "unsupported arguments for GetArrayStructFields", 
child)
             None
           }
+        case expr if expr.prettyName == "array_contains" =>
+          createBinaryExpr(
+            expr.children(0),
+            expr.children(1),
+            inputs,
+            (builder, binaryExpr) => builder.setArrayContains(binaryExpr))
         case _ if expr.prettyName == "array_append" =>
           createBinaryExpr(
             expr.children(0),
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index cce7cb20..36d37065 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -2517,4 +2517,16 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
       checkSparkAnswer(df.select("arrUnsupportedArgs"))
     }
   }
+
+  test("array_contains") {
+    withTempDir { dir =>
+      val path = new Path(dir.toURI.toString, "test.parquet")
+      makeParquetFileAllTypes(path, dictionaryEnabled = false, n = 10000)
+      spark.read.parquet(path.toString).createOrReplaceTempView("t1");
+      checkSparkAnswerAndOperator(
+        spark.sql("SELECT array_contains(array(_2, _3, _4), _2) FROM t1"))
+      checkSparkAnswerAndOperator(
+        spark.sql("SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4) 
END), _4) FROM t1"));
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to