This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 4f8ce757 feat: add support for array_contains expression (#1163)
4f8ce757 is described below
commit 4f8ce757b2832730e3d6974848cf5d3d89072283
Author: Dharan Aditya <[email protected]>
AuthorDate: Fri Jan 3 01:13:02 2025 +0530
feat: add support for array_contains expression (#1163)
* feat: add support for array_contains expression
* test: add unit test for array_contains function
* Removes unnecessary case expression for handling null values
* chore: Move more expressions from core crate to spark-expr crate (#1152)
* move aggregate expressions to spark-expr crate
* move more expressions
* move benchmark
* normalize_nan
* bitwise not
* comet scalar funcs
* update bench imports
* remove dead code (#1155)
* fix: Spark 4.0-preview1 SPARK-47120 (#1156)
## Which issue does this PR close?
Part of https://github.com/apache/datafusion-comet/issues/372 and
https://github.com/apache/datafusion-comet/issues/551
## Rationale for this change
To be ready for Spark 4.0
## What changes are included in this PR?
This PR fixes the new test SPARK-47120 added in Spark 4.0
## How are these changes tested?
tests enabled
* chore: Move string kernels and expressions to spark-expr crate (#1164)
* Move string kernels and expressions to spark-expr crate
* remove unused hash kernel
* remove unused dependencies
* chore: Move remaining expressions to spark-expr crate + some minor
refactoring (#1165)
* move CheckOverflow to spark-expr crate
* move NegativeExpr to spark-expr crate
* move UnboundColumn to spark-expr crate
* move ExpandExec from execution::datafusion::operators to
execution::operators
* refactoring to remove datafusion subpackage
* update imports in benches
* fix
* fix
* chore: Add ignored tests for reading complex types from Parquet (#1167)
* Add ignored tests for reading structs from Parquet
* add basic map test
* add tests for Map and Array
* feat: Add Spark-compatible implementation of SchemaAdapterFactory (#1169)
* Add Spark-compatible SchemaAdapterFactory implementation
* remove prototype code
* fix
* refactor
* implement more cast logic
* implement more cast logic
* add basic test
* improve test
* cleanup
* fmt
* add support for casting unsigned int to signed int
* clippy
* address feedback
* fix test
* fix: Document enabling comet explain plan usage in Spark (4.0) (#1176)
* test: enabling Spark tests with offHeap requirement (#1177)
## Which issue does this PR close?
## Rationale for this change
After https://github.com/apache/datafusion-comet/pull/1062 We have not
running Spark tests for native execution
## What changes are included in this PR?
Removed the off heap requirement for testing
## How are these changes tested?
Bringing back Spark tests for native execution
* feat: Improve shuffle metrics (second attempt) (#1175)
* improve shuffle metrics
* docs
* more metrics
* refactor
* address feedback
* fix: stddev_pop should not directly return 0.0 when count is 1.0 (#1184)
* add test
* fix
* fix
* fix
* feat: Make native shuffle compression configurable and respect
`spark.shuffle.compress` (#1185)
* Make shuffle compression codec and level configurable
* remove lz4 references
* docs
* update comment
* clippy
* fix benches
* clippy
* clippy
* disable test for miri
* remove lz4 reference from proto
* minor: move shuffle classes from common to spark (#1193)
* minor: refactor decodeBatches to make private in broadcast exchange
(#1195)
* minor: refactor prepare_output so that it does not require an
ExecutionContext (#1194)
* fix: fix missing explanation for then branch in case when (#1200)
* minor: remove unused source files (#1202)
* chore: Upgrade to DataFusion 44.0.0-rc2 (#1154)
* move aggregate expressions to spark-expr crate
* move more expressions
* move benchmark
* normalize_nan
* bitwise not
* comet scalar funcs
* update bench imports
* save
* save
* save
* remove unused imports
* clippy
* implement more hashers
* implement Hash and PartialEq
* implement Hash and PartialEq
* implement Hash and PartialEq
* benches
* fix ScalarUDFImpl.return_type failure
* exclude test from miri
* ignore correct test
* ignore another test
* remove miri checks
* use return_type_from_exprs
* Revert "use return_type_from_exprs"
This reverts commit febc1f1ec1301f9b359fc23ad6a117224fce35b7.
* use DF main branch
* hacky workaround for regression in ScalarUDFImpl.return_type
* fix repo url
* pin to revision
* bump to latest rev
* bump to latest DF rev
* bump DF to rev 9f530dd
* add Cargo.lock
* bump DF version
* no default features
* Revert "remove miri checks"
This reverts commit 4638fe3aa5501966cd5d8b53acf26c698b10b3c9.
* Update pin to DataFusion e99e02b9b9093ceb0c13a2dd32a2a89beba47930
* update pin
* Update Cargo.toml
Bump to 44.0.0-rc2
* update cargo lock
* revert miri change
---------
Co-authored-by: Andrew Lamb <[email protected]>
* update UT
Signed-off-by: Dharan Aditya <[email protected]>
* fix typo in UT
Signed-off-by: Dharan Aditya <[email protected]>
---------
Signed-off-by: Dharan Aditya <[email protected]>
Co-authored-by: Andy Grove <[email protected]>
Co-authored-by: KAZUYUKI TANIMURA <[email protected]>
Co-authored-by: Parth Chandra <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
Co-authored-by: Raz Luvaton <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
native/core/src/execution/planner.rs | 15 +++++++++++++++
native/proto/src/proto/expr.proto | 1 +
.../scala/org/apache/comet/serde/QueryPlanSerde.scala | 6 ++++++
.../scala/org/apache/comet/CometExpressionSuite.scala | 12 ++++++++++++
4 files changed, 34 insertions(+)
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index 5a35c62e..c40e2e73 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -98,6 +98,7 @@ use datafusion_expr::{
AggregateUDF, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits,
WindowFunctionDefinition,
};
+use datafusion_functions_nested::array_has::ArrayHas;
use datafusion_physical_expr::expressions::{Literal, StatsType};
use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr::LexOrdering;
@@ -719,6 +720,20 @@ impl PhysicalPlanner {
expr.legacy_negative_index,
)))
}
+ ExprStruct::ArrayContains(expr) => {
+ let src_array_expr =
+ self.create_expr(expr.left.as_ref().unwrap(),
Arc::clone(&input_schema))?;
+ let key_expr =
+ self.create_expr(expr.right.as_ref().unwrap(),
Arc::clone(&input_schema))?;
+ let args = vec![Arc::clone(&src_array_expr), key_expr];
+ let array_has_expr = Arc::new(ScalarFunctionExpr::new(
+ "array_has",
+ Arc::new(ScalarUDF::new_from_impl(ArrayHas::new())),
+ args,
+ DataType::Boolean,
+ ));
+ Ok(array_has_expr)
+ }
expr => Err(ExecutionError::GeneralError(format!(
"Not implemented: {:?}",
expr
diff --git a/native/proto/src/proto/expr.proto
b/native/proto/src/proto/expr.proto
index 7a8ea78d..e76ecdcc 100644
--- a/native/proto/src/proto/expr.proto
+++ b/native/proto/src/proto/expr.proto
@@ -84,6 +84,7 @@ message Expr {
GetArrayStructFields get_array_struct_fields = 57;
BinaryExpr array_append = 58;
ArrayInsert array_insert = 59;
+ BinaryExpr array_contains = 60;
}
}
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 518fa068..dc081b19 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -2266,6 +2266,12 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
withInfo(expr, "unsupported arguments for GetArrayStructFields",
child)
None
}
+ case expr if expr.prettyName == "array_contains" =>
+ createBinaryExpr(
+ expr.children(0),
+ expr.children(1),
+ inputs,
+ (builder, binaryExpr) => builder.setArrayContains(binaryExpr))
case _ if expr.prettyName == "array_append" =>
createBinaryExpr(
expr.children(0),
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index cce7cb20..36d37065 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -2517,4 +2517,16 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
checkSparkAnswer(df.select("arrUnsupportedArgs"))
}
}
+
+ test("array_contains") {
+ withTempDir { dir =>
+ val path = new Path(dir.toURI.toString, "test.parquet")
+ makeParquetFileAllTypes(path, dictionaryEnabled = false, n = 10000)
+ spark.read.parquet(path.toString).createOrReplaceTempView("t1");
+ checkSparkAnswerAndOperator(
+ spark.sql("SELECT array_contains(array(_2, _3, _4), _2) FROM t1"))
+ checkSparkAnswerAndOperator(
+ spark.sql("SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4)
END), _4) FROM t1"));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]