This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1dd5354511 Move `AggregateExpr`, `PhysicalExpr` and `PhysicalSortExpr`
to physical-expr-core (#9926)
1dd5354511 is described below
commit 1dd535451198777693ac962a21b79a680bd4b7e8
Author: Jay Zhan <[email protected]>
AuthorDate: Fri Apr 5 17:57:49 2024 +0800
Move `AggregateExpr`, `PhysicalExpr` and `PhysicalSortExpr` to
physical-expr-core (#9926)
* move PhysicalExpr
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* move physical sort
Signed-off-by: jayzhan211 <[email protected]>
* cleanup dependencies
Signed-off-by: jayzhan211 <[email protected]>
* add readme
Signed-off-by: jayzhan211 <[email protected]>
* disable doc test
Signed-off-by: jayzhan211 <[email protected]>
* move column
Signed-off-by: jayzhan211 <[email protected]>
* fmt
Signed-off-by: jayzhan211 <[email protected]>
* move aggregatexp
Signed-off-by: jayzhan211 <[email protected]>
* move other two utils
Signed-off-by: jayzhan211 <[email protected]>
* license
Signed-off-by: jayzhan211 <[email protected]>
* switch to ignore
Signed-off-by: jayzhan211 <[email protected]>
* move reverse order
Signed-off-by: jayzhan211 <[email protected]>
* rename to common
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
---------
Signed-off-by: jayzhan211 <[email protected]>
---
Cargo.toml | 2 +
datafusion-cli/Cargo.lock | 10 +
datafusion/physical-expr-common/Cargo.toml | 38 ++++
datafusion/physical-expr-common/README.md | 27 +++
.../src/aggregate/mod.rs | 50 +---
.../physical-expr-common/src/aggregate/utils.rs | 69 ++++++
.../src/expressions/column.rs | 123 +---------
.../physical-expr-common/src/expressions/mod.rs | 18 ++
datafusion/physical-expr-common/src/lib.rs | 24 ++
.../physical-expr-common/src/physical_expr.rs | 211 +++++++++++++++++
.../src/sort_expr.rs | 6 +-
.../src/sort_properties.rs | 4 +-
.../src/tree_node.rs | 0
datafusion/physical-expr-common/src/utils.rs | 156 +++++++++++++
datafusion/physical-expr/Cargo.toml | 1 +
datafusion/physical-expr/src/aggregate/mod.rs | 80 +------
datafusion/physical-expr/src/aggregate/utils.rs | 50 +---
datafusion/physical-expr/src/expressions/column.rs | 106 ---------
datafusion/physical-expr/src/expressions/mod.rs | 3 +-
datafusion/physical-expr/src/lib.rs | 32 ++-
datafusion/physical-expr/src/physical_expr.rs | 251 +--------------------
datafusion/physical-expr/src/planner.rs | 61 +++++
datafusion/physical-expr/src/utils/mod.rs | 129 +----------
23 files changed, 663 insertions(+), 788 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 9df489724d..ca34ea9c2a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -26,6 +26,7 @@ members = [
"datafusion/functions",
"datafusion/functions-array",
"datafusion/optimizer",
+ "datafusion/physical-expr-common",
"datafusion/physical-expr",
"datafusion/physical-plan",
"datafusion/proto",
@@ -80,6 +81,7 @@ datafusion-functions = { path = "datafusion/functions",
version = "37.0.0" }
datafusion-functions-array = { path = "datafusion/functions-array", version =
"37.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "37.0.0",
default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version =
"37.0.0", default-features = false }
+datafusion-physical-expr-common = { path = "datafusion/physical-expr-common",
version = "37.0.0", default-features = false }
datafusion-physical-plan = { path = "datafusion/physical-plan", version =
"37.0.0" }
datafusion-proto = { path = "datafusion/proto", version = "37.0.0" }
datafusion-sql = { path = "datafusion/sql", version = "37.0.0" }
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 3be92221d3..d744a891c6 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1331,6 +1331,7 @@ dependencies = [
"datafusion-common",
"datafusion-execution",
"datafusion-expr",
+ "datafusion-physical-expr-common",
"half",
"hashbrown 0.14.3",
"hex",
@@ -1345,6 +1346,15 @@ dependencies = [
"sha2",
]
+[[package]]
+name = "datafusion-physical-expr-common"
+version = "37.0.0"
+dependencies = [
+ "arrow",
+ "datafusion-common",
+ "datafusion-expr",
+]
+
[[package]]
name = "datafusion-physical-plan"
version = "37.0.0"
diff --git a/datafusion/physical-expr-common/Cargo.toml
b/datafusion/physical-expr-common/Cargo.toml
new file mode 100644
index 0000000000..89a41a5d10
--- /dev/null
+++ b/datafusion/physical-expr-common/Cargo.toml
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "datafusion-physical-expr-common"
+description = "Common functionality of physical expression for DataFusion
query engine"
+keywords = ["arrow", "query", "sql"]
+readme = "README.md"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+repository = { workspace = true }
+license = { workspace = true }
+authors = { workspace = true }
+rust-version = { workspace = true }
+
+[lib]
+name = "datafusion_physical_expr_common"
+path = "src/lib.rs"
+
+[dependencies]
+arrow = { workspace = true }
+datafusion-common = { workspace = true, default-features = true }
+datafusion-expr = { workspace = true }
diff --git a/datafusion/physical-expr-common/README.md
b/datafusion/physical-expr-common/README.md
new file mode 100644
index 0000000000..7a1eff77d3
--- /dev/null
+++ b/datafusion/physical-expr-common/README.md
@@ -0,0 +1,27 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# DataFusion Core Physical Expressions
+
+[DataFusion][df] is an extensible query execution framework, written in Rust,
that uses Apache Arrow as its in-memory format.
+
+This crate is a submodule of DataFusion that provides shared APIs for
implementing
+physical expressions such as `PhysicalExpr` and `PhysicalSortExpr`.
+
+[df]: https://crates.io/crates/datafusion
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs
b/datafusion/physical-expr-common/src/aggregate/mod.rs
similarity index 75%
copy from datafusion/physical-expr/src/aggregate/mod.rs
copy to datafusion/physical-expr-common/src/aggregate/mod.rs
index 893178f29d..579f51815d 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr-common/src/aggregate/mod.rs
@@ -15,53 +15,19 @@
// specific language governing permissions and limitations
// under the License.
+pub mod utils;
+
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
-use crate::expressions::{NthValueAgg, OrderSensitiveArrayAgg};
-use crate::{PhysicalExpr, PhysicalSortExpr};
+use crate::physical_expr::PhysicalExpr;
+use crate::sort_expr::PhysicalSortExpr;
use arrow::datatypes::Field;
use datafusion_common::{not_impl_err, Result};
use datafusion_expr::{Accumulator, GroupsAccumulator};
-mod hyperloglog;
-mod tdigest;
-
-pub(crate) mod approx_distinct;
-pub(crate) mod approx_median;
-pub(crate) mod approx_percentile_cont;
-pub(crate) mod approx_percentile_cont_with_weight;
-pub(crate) mod array_agg;
-pub(crate) mod array_agg_distinct;
-pub(crate) mod array_agg_ordered;
-pub(crate) mod average;
-pub(crate) mod bit_and_or_xor;
-pub(crate) mod bool_and_or;
-pub(crate) mod correlation;
-pub(crate) mod count;
-pub(crate) mod count_distinct;
-pub(crate) mod covariance;
-pub(crate) mod first_last;
-pub(crate) mod grouping;
-pub(crate) mod median;
-pub(crate) mod nth_value;
-pub(crate) mod string_agg;
-#[macro_use]
-pub(crate) mod min_max;
-pub(crate) mod groups_accumulator;
-pub(crate) mod regr;
-pub(crate) mod stats;
-pub(crate) mod stddev;
-pub(crate) mod sum;
-pub(crate) mod sum_distinct;
-pub(crate) mod variance;
-
-pub mod build_in;
-pub mod moving_min_max;
-pub mod utils;
-
/// An aggregate expression that:
/// * knows its resulting field
/// * knows how to create its accumulator
@@ -134,11 +100,3 @@ pub trait AggregateExpr: Send + Sync + Debug +
PartialEq<dyn Any> {
not_impl_err!("Retractable Accumulator hasn't been implemented for
{self:?} yet")
}
}
-
-/// Checks whether the given aggregate expression is order-sensitive.
-/// For instance, a `SUM` aggregation doesn't depend on the order of its
inputs.
-/// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering.
-pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
- aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
- || aggr_expr.as_any().is::<NthValueAgg>()
-}
diff --git a/datafusion/physical-expr-common/src/aggregate/utils.rs
b/datafusion/physical-expr-common/src/aggregate/utils.rs
new file mode 100644
index 0000000000..9821ba626b
--- /dev/null
+++ b/datafusion/physical-expr-common/src/aggregate/utils.rs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, sync::Arc};
+
+use arrow::{
+ compute::SortOptions,
+ datatypes::{DataType, Field},
+};
+
+use crate::sort_expr::PhysicalSortExpr;
+
+use super::AggregateExpr;
+
+/// Downcast a `Box<dyn AggregateExpr>` or `Arc<dyn AggregateExpr>`
+/// and return the inner trait object as [`Any`] so
+/// that it can be downcast to a specific implementation.
+///
+/// This method is used when implementing the `PartialEq<dyn Any>`
+/// for [`AggregateExpr`] aggregation expressions and allows comparing the
equality
+/// between the trait objects.
+pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
+ if let Some(obj) = any.downcast_ref::<Arc<dyn AggregateExpr>>() {
+ obj.as_any()
+ } else if let Some(obj) = any.downcast_ref::<Box<dyn AggregateExpr>>() {
+ obj.as_any()
+ } else {
+ any
+ }
+}
+
+/// Construct corresponding fields for lexicographical ordering requirement
expression
+pub fn ordering_fields(
+ ordering_req: &[PhysicalSortExpr],
+ // Data type of each expression in the ordering requirement
+ data_types: &[DataType],
+) -> Vec<Field> {
+ ordering_req
+ .iter()
+ .zip(data_types.iter())
+ .map(|(sort_expr, dtype)| {
+ Field::new(
+ sort_expr.expr.to_string().as_str(),
+ dtype.clone(),
+ // Multi partitions may be empty hence field should be
nullable.
+ true,
+ )
+ })
+ .collect()
+}
+
+/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
+pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions>
{
+ ordering_req.iter().map(|item| item.options).collect()
+}
diff --git a/datafusion/physical-expr/src/expressions/column.rs
b/datafusion/physical-expr-common/src/expressions/column.rs
similarity index 50%
copy from datafusion/physical-expr/src/expressions/column.rs
copy to datafusion/physical-expr-common/src/expressions/column.rs
index a07f36e785..2cd52d6332 100644
--- a/datafusion/physical-expr/src/expressions/column.rs
+++ b/datafusion/physical-expr-common/src/expressions/column.rs
@@ -21,9 +21,6 @@ use std::any::Any;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
-use crate::physical_expr::down_cast_any_ref;
-use crate::PhysicalExpr;
-
use arrow::{
datatypes::{DataType, Schema},
record_batch::RecordBatch,
@@ -31,6 +28,8 @@ use arrow::{
use datafusion_common::{internal_err, Result};
use datafusion_expr::ColumnarValue;
+use crate::physical_expr::{down_cast_any_ref, PhysicalExpr};
+
/// Represents the column at a given index in a RecordBatch
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
pub struct Column {
@@ -132,125 +131,7 @@ impl Column {
}
}
-#[derive(Debug, Hash, PartialEq, Eq, Clone)]
-pub struct UnKnownColumn {
- name: String,
-}
-
-impl UnKnownColumn {
- /// Create a new unknown column expression
- pub fn new(name: &str) -> Self {
- Self {
- name: name.to_owned(),
- }
- }
-
- /// Get the column name
- pub fn name(&self) -> &str {
- &self.name
- }
-}
-
-impl std::fmt::Display for UnKnownColumn {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "{}", self.name)
- }
-}
-
-impl PhysicalExpr for UnKnownColumn {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn std::any::Any {
- self
- }
-
- /// Get the data type of this expression, given the schema of the input
- fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
- Ok(DataType::Null)
- }
-
- /// Decide whehter this expression is nullable, given the schema of the
input
- fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
- Ok(true)
- }
-
- /// Evaluate the expression
- fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
- internal_err!("UnKnownColumn::evaluate() should not be called")
- }
-
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![]
- }
-
- fn with_new_children(
- self: Arc<Self>,
- _children: Vec<Arc<dyn PhysicalExpr>>,
- ) -> Result<Arc<dyn PhysicalExpr>> {
- Ok(self)
- }
-
- fn dyn_hash(&self, state: &mut dyn Hasher) {
- let mut s = state;
- self.hash(&mut s);
- }
-}
-
-impl PartialEq<dyn Any> for UnKnownColumn {
- fn eq(&self, other: &dyn Any) -> bool {
- down_cast_any_ref(other)
- .downcast_ref::<Self>()
- .map(|x| self == x)
- .unwrap_or(false)
- }
-}
-
/// Create a column expression
pub fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(Column::new_with_schema(name, schema)?))
}
-
-#[cfg(test)]
-mod test {
- use crate::expressions::Column;
- use crate::PhysicalExpr;
-
- use arrow::array::StringArray;
- use arrow::datatypes::{DataType, Field, Schema};
- use arrow::record_batch::RecordBatch;
- use datafusion_common::Result;
-
- use std::sync::Arc;
-
- #[test]
- fn out_of_bounds_data_type() {
- let schema = Schema::new(vec![Field::new("foo", DataType::Utf8,
true)]);
- let col = Column::new("id", 9);
- let error =
col.data_type(&schema).expect_err("error").strip_backtrace();
- assert!("Internal error: PhysicalExpr Column references column 'id' at
index 9 (zero-based) \
- but input schema only has 1 columns: [\"foo\"].\nThis was likely
caused by a bug in \
- DataFusion's code and we would welcome that you file an bug report
in our issue tracker".starts_with(&error))
- }
-
- #[test]
- fn out_of_bounds_nullable() {
- let schema = Schema::new(vec![Field::new("foo", DataType::Utf8,
true)]);
- let col = Column::new("id", 9);
- let error =
col.nullable(&schema).expect_err("error").strip_backtrace();
- assert!("Internal error: PhysicalExpr Column references column 'id' at
index 9 (zero-based) \
- but input schema only has 1 columns: [\"foo\"].\nThis was likely
caused by a bug in \
- DataFusion's code and we would welcome that you file an bug report
in our issue tracker".starts_with(&error))
- }
-
- #[test]
- fn out_of_bounds_evaluate() -> Result<()> {
- let schema = Schema::new(vec![Field::new("foo", DataType::Utf8,
true)]);
- let data: StringArray = vec!["data"].into();
- let batch = RecordBatch::try_new(Arc::new(schema),
vec![Arc::new(data)])?;
- let col = Column::new("id", 9);
- let error = col.evaluate(&batch).expect_err("error").strip_backtrace();
- assert!("Internal error: PhysicalExpr Column references column 'id' at
index 9 (zero-based) \
- but input schema only has 1 columns: [\"foo\"].\nThis was likely
caused by a bug in \
- DataFusion's code and we would welcome that you file an bug report
in our issue tracker".starts_with(&error));
- Ok(())
- }
-}
diff --git a/datafusion/physical-expr-common/src/expressions/mod.rs
b/datafusion/physical-expr-common/src/expressions/mod.rs
new file mode 100644
index 0000000000..d102422081
--- /dev/null
+++ b/datafusion/physical-expr-common/src/expressions/mod.rs
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod column;
diff --git a/datafusion/physical-expr-common/src/lib.rs
b/datafusion/physical-expr-common/src/lib.rs
new file mode 100644
index 0000000000..53e3134a1b
--- /dev/null
+++ b/datafusion/physical-expr-common/src/lib.rs
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod aggregate;
+pub mod expressions;
+pub mod physical_expr;
+pub mod sort_expr;
+pub mod sort_properties;
+pub mod tree_node;
+pub mod utils;
diff --git a/datafusion/physical-expr-common/src/physical_expr.rs
b/datafusion/physical-expr-common/src/physical_expr.rs
new file mode 100644
index 0000000000..be6358e73c
--- /dev/null
+++ b/datafusion/physical-expr-common/src/physical_expr.rs
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::fmt::{Debug, Display};
+use std::hash::{Hash, Hasher};
+use std::sync::Arc;
+
+use arrow::array::BooleanArray;
+use arrow::compute::filter_record_batch;
+use arrow::datatypes::{DataType, Schema};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::utils::DataPtr;
+use datafusion_common::{internal_err, not_impl_err, Result};
+use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::ColumnarValue;
+
+use crate::sort_properties::SortProperties;
+use crate::utils::scatter;
+
+/// See
[create_physical_expr](https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html)
+/// for examples of creating `PhysicalExpr` from `Expr`
+pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq<dyn Any> {
+ /// Returns the physical expression as [`Any`] so that it can be
+ /// downcast to a specific implementation.
+ fn as_any(&self) -> &dyn Any;
+ /// Get the data type of this expression, given the schema of the input
+ fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
+ /// Determine whether this expression is nullable, given the schema of the
input
+ fn nullable(&self, input_schema: &Schema) -> Result<bool>;
+ /// Evaluate an expression against a RecordBatch
+ fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+ /// Evaluate an expression against a RecordBatch after first applying a
+ /// validity array
+ fn evaluate_selection(
+ &self,
+ batch: &RecordBatch,
+ selection: &BooleanArray,
+ ) -> Result<ColumnarValue> {
+ let tmp_batch = filter_record_batch(batch, selection)?;
+
+ let tmp_result = self.evaluate(&tmp_batch)?;
+
+ if batch.num_rows() == tmp_batch.num_rows() {
+ // All values from the `selection` filter are true.
+ Ok(tmp_result)
+ } else if let ColumnarValue::Array(a) = tmp_result {
+ scatter(selection, a.as_ref()).map(ColumnarValue::Array)
+ } else {
+ Ok(tmp_result)
+ }
+ }
+
+ /// Get a list of child PhysicalExpr that provide the input for this expr.
+ fn children(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+
+ /// Returns a new PhysicalExpr where all children were replaced by new
exprs.
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn PhysicalExpr>>,
+ ) -> Result<Arc<dyn PhysicalExpr>>;
+
+ /// Computes the output interval for the expression, given the input
+ /// intervals.
+ ///
+ /// # Arguments
+ ///
+ /// * `children` are the intervals for the children (inputs) of this
+ /// expression.
+ ///
+ /// # Example
+ ///
+ /// If the expression is `a + b`, and the input intervals are `a: [1, 2]`
+ /// and `b: [3, 4]`, then the output interval would be `[4, 6]`.
+ fn evaluate_bounds(&self, _children: &[&Interval]) -> Result<Interval> {
+ not_impl_err!("Not implemented for {self}")
+ }
+
+ /// Updates bounds for child expressions, given a known interval for this
+ /// expression.
+ ///
+ /// This is used to propagate constraints down through an expression tree.
+ ///
+ /// # Arguments
+ ///
+ /// * `interval` is the currently known interval for this expression.
+ /// * `children` are the current intervals for the children of this
expression.
+ ///
+ /// # Returns
+ ///
+ /// A `Vec` of new intervals for the children, in order.
+ ///
+ /// If constraint propagation reveals an infeasibility for any child,
returns
+ /// [`None`]. If none of the children intervals change as a result of
propagation,
+ /// may return an empty vector instead of cloning `children`. This is the
default
+ /// (and conservative) return value.
+ ///
+ /// # Example
+ ///
+ /// If the expression is `a + b`, the current `interval` is `[4, 5]` and
the
+ /// inputs `a` and `b` are respectively given as `[0, 2]` and `[-∞, 4]`,
then
+ /// propagation would would return `[0, 2]` and `[2, 4]` as `b` must be at
+ /// least `2` to make the output at least `4`.
+ fn propagate_constraints(
+ &self,
+ _interval: &Interval,
+ _children: &[&Interval],
+ ) -> Result<Option<Vec<Interval>>> {
+ Ok(Some(vec![]))
+ }
+
+ /// Update the hash `state` with this expression requirements from
+ /// [`Hash`].
+ ///
+ /// This method is required to support hashing [`PhysicalExpr`]s. To
+ /// implement it, typically the type implementing
+ /// [`PhysicalExpr`] implements [`Hash`] and
+ /// then the following boiler plate is used:
+ ///
+ /// # Example:
+ /// ```
+ /// // User defined expression that derives Hash
+ /// #[derive(Hash, Debug, PartialEq, Eq)]
+ /// struct MyExpr {
+ /// val: u64
+ /// }
+ ///
+ /// // impl PhysicalExpr {
+ /// // ...
+ /// # impl MyExpr {
+ /// // Boiler plate to call the derived Hash impl
+ /// fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) {
+ /// use std::hash::Hash;
+ /// let mut s = state;
+ /// self.hash(&mut s);
+ /// }
+ /// // }
+ /// # }
+ /// ```
+ /// Note: [`PhysicalExpr`] is not constrained by [`Hash`]
+ /// directly because it must remain object safe.
+ fn dyn_hash(&self, _state: &mut dyn Hasher);
+
+ /// The order information of a PhysicalExpr can be estimated from its
children.
+ /// This is especially helpful for projection expressions. If we can
ensure that the
+ /// order of a PhysicalExpr to project matches with the order of SortExec,
we can
+ /// eliminate that SortExecs.
+ ///
+ /// By recursively calling this function, we can obtain the overall order
+ /// information of the PhysicalExpr. Since `SortOptions` cannot fully
handle
+ /// the propagation of unordered columns and literals, the `SortProperties`
+ /// struct is used.
+ fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties {
+ SortProperties::Unordered
+ }
+}
+
+impl Hash for dyn PhysicalExpr {
+ fn hash<H: Hasher>(&self, state: &mut H) {
+ self.dyn_hash(state);
+ }
+}
+
+/// Returns a copy of this expr if we change any child according to the
pointer comparison.
+/// The size of `children` must be equal to the size of
`PhysicalExpr::children()`.
+pub fn with_new_children_if_necessary(
+ expr: Arc<dyn PhysicalExpr>,
+ children: Vec<Arc<dyn PhysicalExpr>>,
+) -> Result<Arc<dyn PhysicalExpr>> {
+ let old_children = expr.children();
+ if children.len() != old_children.len() {
+ internal_err!("PhysicalExpr: Wrong number of children")
+ } else if children.is_empty()
+ || children
+ .iter()
+ .zip(old_children.iter())
+ .any(|(c1, c2)| !Arc::data_ptr_eq(c1, c2))
+ {
+ Ok(expr.with_new_children(children)?)
+ } else {
+ Ok(expr)
+ }
+}
+
+pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
+ if any.is::<Arc<dyn PhysicalExpr>>() {
+ any.downcast_ref::<Arc<dyn PhysicalExpr>>()
+ .unwrap()
+ .as_any()
+ } else if any.is::<Box<dyn PhysicalExpr>>() {
+ any.downcast_ref::<Box<dyn PhysicalExpr>>()
+ .unwrap()
+ .as_any()
+ } else {
+ any
+ }
+}
diff --git a/datafusion/physical-expr/src/sort_expr.rs
b/datafusion/physical-expr-common/src/sort_expr.rs
similarity index 99%
rename from datafusion/physical-expr/src/sort_expr.rs
rename to datafusion/physical-expr-common/src/sort_expr.rs
index 914d76f926..1e1187212d 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr-common/src/sort_expr.rs
@@ -21,14 +21,14 @@ use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
-use crate::PhysicalExpr;
-
use arrow::compute::kernels::sort::{SortColumn, SortOptions};
+use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
-use arrow_schema::Schema;
use datafusion_common::Result;
use datafusion_expr::ColumnarValue;
+use crate::physical_expr::PhysicalExpr;
+
/// Represents Sort operation for a column in a RecordBatch
#[derive(Clone, Debug)]
pub struct PhysicalSortExpr {
diff --git a/datafusion/physical-expr/src/sort_properties.rs
b/datafusion/physical-expr-common/src/sort_properties.rs
similarity index 99%
rename from datafusion/physical-expr/src/sort_properties.rs
rename to datafusion/physical-expr-common/src/sort_properties.rs
index 4df29ced2f..47a5d5ba5e 100644
--- a/datafusion/physical-expr/src/sort_properties.rs
+++ b/datafusion/physical-expr-common/src/sort_properties.rs
@@ -17,9 +17,9 @@
use std::ops::Neg;
-use crate::tree_node::ExprContext;
+use arrow::compute::SortOptions;
-use arrow_schema::SortOptions;
+use crate::tree_node::ExprContext;
/// To propagate [`SortOptions`] across the `PhysicalExpr`, it is insufficient
/// to simply use `Option<SortOptions>`: There must be a differentiation
between
diff --git a/datafusion/physical-expr/src/tree_node.rs
b/datafusion/physical-expr-common/src/tree_node.rs
similarity index 100%
rename from datafusion/physical-expr/src/tree_node.rs
rename to datafusion/physical-expr-common/src/tree_node.rs
diff --git a/datafusion/physical-expr-common/src/utils.rs
b/datafusion/physical-expr-common/src/utils.rs
new file mode 100644
index 0000000000..459b5a4849
--- /dev/null
+++ b/datafusion/physical-expr-common/src/utils.rs
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::{
+ array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData},
+ compute::{and_kleene, is_not_null, SlicesIterator},
+};
+use datafusion_common::Result;
+
+use crate::sort_expr::PhysicalSortExpr;
+
+/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`,
next values of `truthy`
+/// are taken, when the mask evaluates `false` values null values are filled.
+///
+/// # Arguments
+/// * `mask` - Boolean values used to determine where to put the `truthy`
values
+/// * `truthy` - All values of this array are to scatter according to `mask`
into final result.
+pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
+ let truthy = truthy.to_data();
+
+ // update the mask so that any null values become false
+ // (SlicesIterator doesn't respect nulls)
+ let mask = and_kleene(mask, &is_not_null(mask)?)?;
+
+ let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
+
+ // the SlicesIterator slices only the true values. So the gaps left by
this iterator we need to
+ // fill with falsy values
+
+ // keep track of how much is filled
+ let mut filled = 0;
+ // keep track of current position we have in truthy array
+ let mut true_pos = 0;
+
+ SlicesIterator::new(&mask).for_each(|(start, end)| {
+ // the gap needs to be filled with nulls
+ if start > filled {
+ mutable.extend_nulls(start - filled);
+ }
+ // fill with truthy values
+ let len = end - start;
+ mutable.extend(0, true_pos, true_pos + len);
+ true_pos += len;
+ filled = end;
+ });
+ // the remaining part is falsy
+ if filled < mask.len() {
+ mutable.extend_nulls(mask.len() - filled);
+ }
+
+ let data = mutable.freeze();
+ Ok(make_array(data))
+}
+
+/// Reverses the ORDER BY expression, which is useful during equivalent window
+/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns
into
+/// 'ORDER BY a DESC, NULLS FIRST'.
+pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) ->
Vec<PhysicalSortExpr> {
+ order_bys
+ .iter()
+ .map(|e| PhysicalSortExpr {
+ expr: e.expr.clone(),
+ options: !e.options,
+ })
+ .collect()
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use arrow::array::Int32Array;
+ use datafusion_common::cast::{as_boolean_array, as_int32_array};
+
+ use super::*;
+
+ #[test]
+ fn scatter_int() -> Result<()> {
+ let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
+ let mask = BooleanArray::from(vec![true, true, false, false, true]);
+
+ // the output array is expected to be the same length as the mask array
+ let expected =
+ Int32Array::from_iter(vec![Some(1), Some(10), None, None,
Some(11)]);
+ let result = scatter(&mask, truthy.as_ref())?;
+ let result = as_int32_array(&result)?;
+
+ assert_eq!(&expected, result);
+ Ok(())
+ }
+
+ #[test]
+ fn scatter_int_end_with_false() -> Result<()> {
+ let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
+ let mask = BooleanArray::from(vec![true, false, true, false, false,
false]);
+
+ // output should be same length as mask
+ let expected =
+ Int32Array::from_iter(vec![Some(1), None, Some(10), None, None,
None]);
+ let result = scatter(&mask, truthy.as_ref())?;
+ let result = as_int32_array(&result)?;
+
+ assert_eq!(&expected, result);
+ Ok(())
+ }
+
+ #[test]
+ fn scatter_with_null_mask() -> Result<()> {
+ let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
+ let mask: BooleanArray = vec![Some(false), None, Some(true),
Some(true), None]
+ .into_iter()
+ .collect();
+
+ // output should treat nulls as though they are false
+ let expected = Int32Array::from_iter(vec![None, None, Some(1),
Some(10), None]);
+ let result = scatter(&mask, truthy.as_ref())?;
+ let result = as_int32_array(&result)?;
+
+ assert_eq!(&expected, result);
+ Ok(())
+ }
+
+ #[test]
+ fn scatter_boolean() -> Result<()> {
+ let truthy = Arc::new(BooleanArray::from(vec![false, false, false,
true]));
+ let mask = BooleanArray::from(vec![true, true, false, false, true]);
+
+ // the output array is expected to be the same length as the mask array
+ let expected = BooleanArray::from_iter(vec![
+ Some(false),
+ Some(false),
+ None,
+ None,
+ Some(false),
+ ]);
+ let result = scatter(&mask, truthy.as_ref())?;
+ let result = as_boolean_array(&result)?;
+
+ assert_eq!(&expected, result);
+ Ok(())
+ }
+}
diff --git a/datafusion/physical-expr/Cargo.toml
b/datafusion/physical-expr/Cargo.toml
index 56b3f3c91e..87d73183d0 100644
--- a/datafusion/physical-expr/Cargo.toml
+++ b/datafusion/physical-expr/Cargo.toml
@@ -59,6 +59,7 @@ chrono = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
+datafusion-physical-expr-common = { workspace = true }
half = { workspace = true }
hashbrown = { version = "0.14", features = ["raw"] }
hex = { version = "0.4", optional = true }
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs
b/datafusion/physical-expr/src/aggregate/mod.rs
index 893178f29d..e176084ae6 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -15,16 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use std::any::Any;
-use std::fmt::Debug;
use std::sync::Arc;
use crate::expressions::{NthValueAgg, OrderSensitiveArrayAgg};
-use crate::{PhysicalExpr, PhysicalSortExpr};
-use arrow::datatypes::Field;
-use datafusion_common::{not_impl_err, Result};
-use datafusion_expr::{Accumulator, GroupsAccumulator};
+pub use datafusion_physical_expr_common::aggregate::AggregateExpr;
mod hyperloglog;
mod tdigest;
@@ -62,79 +57,6 @@ pub mod build_in;
pub mod moving_min_max;
pub mod utils;
-/// An aggregate expression that:
-/// * knows its resulting field
-/// * knows how to create its accumulator
-/// * knows its accumulator's state's field
-/// * knows the expressions from whose its accumulator will receive values
-///
-/// Any implementation of this trait also needs to implement the
-/// `PartialEq<dyn Any>` to allows comparing equality between the
-/// trait objects.
-pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
- /// Returns the aggregate expression as [`Any`] so that it can be
- /// downcast to a specific implementation.
- fn as_any(&self) -> &dyn Any;
-
- /// the field of the final result of this aggregation.
- fn field(&self) -> Result<Field>;
-
- /// the accumulator used to accumulate values from the expressions.
- /// the accumulator expects the same number of arguments as `expressions`
and must
- /// return states with the same description as `state_fields`
- fn create_accumulator(&self) -> Result<Box<dyn Accumulator>>;
-
- /// the fields that encapsulate the Accumulator's state
- /// the number of fields here equals the number of states that the
accumulator contains
- fn state_fields(&self) -> Result<Vec<Field>>;
-
- /// expressions that are passed to the Accumulator.
- /// Single-column aggregations such as `sum` return a single value, others
(e.g. `cov`) return many.
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
-
- /// Order by requirements for the aggregate function
- /// By default it is `None` (there is no requirement)
- /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)`
should implement this
- fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- /// Human readable name such as `"MIN(c2)"`. The default
- /// implementation returns placeholder text.
- fn name(&self) -> &str {
- "AggregateExpr: default name"
- }
-
- /// If the aggregate expression has a specialized
- /// [`GroupsAccumulator`] implementation. If this returns true,
- /// `[Self::create_groups_accumulator`] will be called.
- fn groups_accumulator_supported(&self) -> bool {
- false
- }
-
- /// Return a specialized [`GroupsAccumulator`] that manages state
- /// for all groups.
- ///
- /// For maximum performance, a [`GroupsAccumulator`] should be
- /// implemented in addition to [`Accumulator`].
- fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
- not_impl_err!("GroupsAccumulator hasn't been implemented for {self:?}
yet")
- }
-
- /// Construct an expression that calculates the aggregate in reverse.
- /// Typically the "reverse" expression is itself (e.g. SUM, COUNT).
- /// For aggregates that do not support calculation in reverse,
- /// returns None (which is the default value).
- fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
- None
- }
-
- /// Creates accumulator implementation that supports retract
- fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- not_impl_err!("Retractable Accumulator hasn't been implemented for
{self:?} yet")
- }
-}
-
/// Checks whether the given aggregate expression is order-sensitive.
/// For instance, a `SUM` aggregation doesn't depend on the order of its
inputs.
/// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering.
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs
b/datafusion/physical-expr/src/aggregate/utils.rs
index 613f6118e9..d14a52f575 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -17,10 +17,12 @@
//! Utilities used in aggregates
-use std::any::Any;
use std::sync::Arc;
-use crate::{AggregateExpr, PhysicalSortExpr};
+// For backwards compatibility
+pub use datafusion_physical_expr_common::aggregate::utils::down_cast_any_ref;
+pub use datafusion_physical_expr_common::aggregate::utils::get_sort_options;
+pub use datafusion_physical_expr_common::aggregate::utils::ordering_fields;
use arrow::array::{ArrayRef, ArrowNativeTypeOp};
use arrow_array::cast::AsArray;
@@ -29,7 +31,7 @@ use arrow_array::types::{
TimestampNanosecondType, TimestampSecondType,
};
use arrow_buffer::{ArrowNativeType, ToByteSlice};
-use arrow_schema::{DataType, Field, SortOptions};
+use arrow_schema::DataType;
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
@@ -170,48 +172,6 @@ pub fn adjust_output_array(
Ok(array)
}
-/// Downcast a `Box<dyn AggregateExpr>` or `Arc<dyn AggregateExpr>`
-/// and return the inner trait object as [`Any`] so
-/// that it can be downcast to a specific implementation.
-///
-/// This method is used when implementing the `PartialEq<dyn Any>`
-/// for [`AggregateExpr`] aggregation expressions and allows comparing the
equality
-/// between the trait objects.
-pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
- if let Some(obj) = any.downcast_ref::<Arc<dyn AggregateExpr>>() {
- obj.as_any()
- } else if let Some(obj) = any.downcast_ref::<Box<dyn AggregateExpr>>() {
- obj.as_any()
- } else {
- any
- }
-}
-
-/// Construct corresponding fields for lexicographical ordering requirement
expression
-pub fn ordering_fields(
- ordering_req: &[PhysicalSortExpr],
- // Data type of each expression in the ordering requirement
- data_types: &[DataType],
-) -> Vec<Field> {
- ordering_req
- .iter()
- .zip(data_types.iter())
- .map(|(sort_expr, dtype)| {
- Field::new(
- sort_expr.expr.to_string().as_str(),
- dtype.clone(),
- // Multi partitions may be empty hence field should be
nullable.
- true,
- )
- })
- .collect()
-}
-
-/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
-pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions>
{
- ordering_req.iter().map(|item| item.options).collect()
-}
-
/// A wrapper around a type to provide hash for floats
#[derive(Copy, Clone, Debug)]
pub(crate) struct Hashable<T>(pub T);
diff --git a/datafusion/physical-expr/src/expressions/column.rs
b/datafusion/physical-expr/src/expressions/column.rs
index a07f36e785..634a56d1d6 100644
--- a/datafusion/physical-expr/src/expressions/column.rs
+++ b/datafusion/physical-expr/src/expressions/column.rs
@@ -31,107 +31,6 @@ use arrow::{
use datafusion_common::{internal_err, Result};
use datafusion_expr::ColumnarValue;
-/// Represents the column at a given index in a RecordBatch
-#[derive(Debug, Hash, PartialEq, Eq, Clone)]
-pub struct Column {
- name: String,
- index: usize,
-}
-
-impl Column {
- /// Create a new column expression
- pub fn new(name: &str, index: usize) -> Self {
- Self {
- name: name.to_owned(),
- index,
- }
- }
-
- /// Create a new column expression based on column name and schema
- pub fn new_with_schema(name: &str, schema: &Schema) -> Result<Self> {
- Ok(Column::new(name, schema.index_of(name)?))
- }
-
- /// Get the column name
- pub fn name(&self) -> &str {
- &self.name
- }
-
- /// Get the column index
- pub fn index(&self) -> usize {
- self.index
- }
-}
-
-impl std::fmt::Display for Column {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "{}@{}", self.name, self.index)
- }
-}
-
-impl PhysicalExpr for Column {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn std::any::Any {
- self
- }
-
- /// Get the data type of this expression, given the schema of the input
- fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
- self.bounds_check(input_schema)?;
- Ok(input_schema.field(self.index).data_type().clone())
- }
-
- /// Decide whehter this expression is nullable, given the schema of the
input
- fn nullable(&self, input_schema: &Schema) -> Result<bool> {
- self.bounds_check(input_schema)?;
- Ok(input_schema.field(self.index).is_nullable())
- }
-
- /// Evaluate the expression
- fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
- self.bounds_check(batch.schema().as_ref())?;
- Ok(ColumnarValue::Array(batch.column(self.index).clone()))
- }
-
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![]
- }
-
- fn with_new_children(
- self: Arc<Self>,
- _children: Vec<Arc<dyn PhysicalExpr>>,
- ) -> Result<Arc<dyn PhysicalExpr>> {
- Ok(self)
- }
-
- fn dyn_hash(&self, state: &mut dyn Hasher) {
- let mut s = state;
- self.hash(&mut s);
- }
-}
-
-impl PartialEq<dyn Any> for Column {
- fn eq(&self, other: &dyn Any) -> bool {
- down_cast_any_ref(other)
- .downcast_ref::<Self>()
- .map(|x| self == x)
- .unwrap_or(false)
- }
-}
-
-impl Column {
- fn bounds_check(&self, input_schema: &Schema) -> Result<()> {
- if self.index < input_schema.fields.len() {
- Ok(())
- } else {
- internal_err!(
- "PhysicalExpr Column references column '{}' at index {}
(zero-based) but input schema only has {} columns: {:?}",
- self.name,
- self.index, input_schema.fields.len(),
input_schema.fields().iter().map(|f| f.name().clone()).collect::<Vec<String>>())
- }
- }
-}
-
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
pub struct UnKnownColumn {
name: String,
@@ -204,11 +103,6 @@ impl PartialEq<dyn Any> for UnKnownColumn {
}
}
-/// Create a column expression
-pub fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
- Ok(Arc::new(Column::new_with_schema(name, schema)?))
-}
-
#[cfg(test)]
mod test {
use crate::expressions::Column;
diff --git a/datafusion/physical-expr/src/expressions/mod.rs
b/datafusion/physical-expr/src/expressions/mod.rs
index 7c4ea07dfb..f0cc4b175e 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -80,7 +80,8 @@ pub use crate::PhysicalSortExpr;
pub use binary::{binary, BinaryExpr};
pub use case::{case, CaseExpr};
pub use cast::{cast, cast_with_options, CastExpr};
-pub use column::{col, Column, UnKnownColumn};
+pub use column::UnKnownColumn;
+pub use datafusion_physical_expr_common::expressions::column::{col, Column};
pub use in_list::{in_list, InListExpr};
pub use is_not_null::{is_not_null, IsNotNullExpr};
pub use is_null::{is_null, IsNullExpr};
diff --git a/datafusion/physical-expr/src/lib.rs
b/datafusion/physical-expr/src/lib.rs
index 655771270a..c88f1b32bb 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -28,10 +28,7 @@ mod partitioning;
mod physical_expr;
pub mod planner;
mod scalar_function;
-mod sort_expr;
-pub mod sort_properties;
pub mod string_expressions;
-pub mod tree_node;
pub mod udf;
pub mod utils;
pub mod window;
@@ -43,20 +40,37 @@ pub mod execution_props {
}
pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState};
-pub use aggregate::AggregateExpr;
pub use analysis::{analyze, AnalysisContext, ExprBoundaries};
+pub use datafusion_physical_expr_common::aggregate::AggregateExpr;
pub use equivalence::EquivalenceProperties;
pub use partitioning::{Distribution, Partitioning};
pub use physical_expr::{
physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
- PhysicalExpr, PhysicalExprRef,
+ PhysicalExprRef,
};
-pub use planner::{create_physical_expr, create_physical_exprs};
-pub use scalar_function::ScalarFunctionExpr;
-pub use sort_expr::{
+
+pub use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+pub use datafusion_physical_expr_common::sort_expr::{
LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef,
PhysicalSortExpr,
PhysicalSortRequirement,
};
-pub use utils::{reverse_order_bys, split_conjunction};
+
+pub use planner::{create_physical_expr, create_physical_exprs};
+pub use scalar_function::ScalarFunctionExpr;
+
+pub use datafusion_physical_expr_common::utils::reverse_order_bys;
+pub use utils::split_conjunction;
pub use aggregate::first_last::create_first_value_accumulator;
+
+// For backwards compatibility
+pub mod sort_properties {
+ pub use datafusion_physical_expr_common::sort_properties::{
+ ExprOrdering, SortProperties,
+ };
+}
+
+// For backwards compatibility
+pub mod tree_node {
+ pub use datafusion_physical_expr_common::tree_node::ExprContext;
+}
diff --git a/datafusion/physical-expr/src/physical_expr.rs
b/datafusion/physical-expr/src/physical_expr.rs
index 861a4ad028..bc265d3819 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -15,263 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-use std::any::Any;
-use std::fmt::{Debug, Display};
-use std::hash::{Hash, Hasher};
use std::sync::Arc;
-use crate::sort_properties::SortProperties;
-use crate::utils::scatter;
-
-use arrow::array::BooleanArray;
-use arrow::compute::filter_record_batch;
-use arrow::datatypes::{DataType, Schema};
-use arrow::record_batch::RecordBatch;
-use datafusion_common::utils::DataPtr;
-use datafusion_common::{internal_err, not_impl_err, Result};
-use datafusion_expr::interval_arithmetic::Interval;
-use datafusion_expr::ColumnarValue;
-
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use itertools::izip;
-/// `PhysicalExpr` evaluate DataFusion expressions such as `A + 1`, or `CAST(c1
-/// AS int)`.
-///
-/// `PhysicalExpr` are the physical counterpart to [`Expr`] used in logical
-/// planning, and can be evaluated directly on a [`RecordBatch`]. They are
-/// normally created from `Expr` by a [`PhysicalPlanner`] and can be created
-/// directly using [`create_physical_expr`].
-///
-/// A Physical expression knows its type, nullability and how to evaluate
itself.
-///
-/// [`PhysicalPlanner`]:
https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html
-/// [`create_physical_expr`]: crate::create_physical_expr
-/// [`Expr`]: datafusion_expr::Expr
-///
-/// # Example: Create `PhysicalExpr` from `Expr`
-/// ```
-/// # use arrow_schema::{DataType, Field, Schema};
-/// # use datafusion_common::DFSchema;
-/// # use datafusion_expr::{Expr, col, lit};
-/// # use datafusion_physical_expr::create_physical_expr;
-/// # use datafusion_expr::execution_props::ExecutionProps;
-/// // For a logical expression `a = 1`, we can create a physical expression
-/// let expr = col("a").eq(lit(1));
-/// // To create a PhysicalExpr we need 1. a schema
-/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
-/// let df_schema = DFSchema::try_from(schema).unwrap();
-/// // 2. ExecutionProps
-/// let props = ExecutionProps::new();
-/// // We can now create a PhysicalExpr:
-/// let physical_expr = create_physical_expr(&expr, &df_schema,
&props).unwrap();
-/// ```
-///
-/// # Example: Executing a PhysicalExpr to obtain [`ColumnarValue`]
-/// ```
-/// # use std::sync::Arc;
-/// # use arrow_array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch};
-/// # use arrow_schema::{DataType, Field, Schema};
-/// # use datafusion_common::{assert_batches_eq, DFSchema};
-/// # use datafusion_expr::{Expr, col, lit, ColumnarValue};
-/// # use datafusion_physical_expr::create_physical_expr;
-/// # use datafusion_expr::execution_props::ExecutionProps;
-/// # let expr = col("a").eq(lit(1));
-/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
-/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap();
-/// # let props = ExecutionProps::new();
-/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a
RecordBatch like this:
-/// let physical_expr = create_physical_expr(&expr, &df_schema,
&props).unwrap();
-/// // Input of [1,2,3]
-/// let input_batch = RecordBatch::try_from_iter(vec![
-/// ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _)
-/// ]).unwrap();
-/// // The result is a ColumnarValue (either an Array or a Scalar)
-/// let result = physical_expr.evaluate(&input_batch).unwrap();
-/// // In this case, a BooleanArray with the result of the comparison
-/// let ColumnarValue::Array(arr) = result else {
-/// panic!("Expected an array")
-/// };
-/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false,
false]));
-/// ```
-pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq<dyn Any> {
- /// Returns the physical expression as [`Any`] so that it can be
- /// downcast to a specific implementation.
- fn as_any(&self) -> &dyn Any;
- /// Get the data type of this expression, given the schema of the input
- fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
- /// Determine whether this expression is nullable, given the schema of the
input
- fn nullable(&self, input_schema: &Schema) -> Result<bool>;
- /// Evaluate an expression against a RecordBatch
- fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
- /// Evaluate an expression against a RecordBatch after first applying a
- /// validity array
- fn evaluate_selection(
- &self,
- batch: &RecordBatch,
- selection: &BooleanArray,
- ) -> Result<ColumnarValue> {
- let tmp_batch = filter_record_batch(batch, selection)?;
-
- let tmp_result = self.evaluate(&tmp_batch)?;
-
- if batch.num_rows() == tmp_batch.num_rows() {
- // All values from the `selection` filter are true.
- Ok(tmp_result)
- } else if let ColumnarValue::Array(a) = tmp_result {
- scatter(selection, a.as_ref()).map(ColumnarValue::Array)
- } else {
- Ok(tmp_result)
- }
- }
-
- /// Get a list of child PhysicalExpr that provide the input for this expr.
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>>;
-
- /// Returns a new PhysicalExpr where all children were replaced by new
exprs.
- fn with_new_children(
- self: Arc<Self>,
- children: Vec<Arc<dyn PhysicalExpr>>,
- ) -> Result<Arc<dyn PhysicalExpr>>;
-
- /// Computes the output interval for the expression, given the input
- /// intervals.
- ///
- /// # Arguments
- ///
- /// * `children` are the intervals for the children (inputs) of this
- /// expression.
- ///
- /// # Example
- ///
- /// If the expression is `a + b`, and the input intervals are `a: [1, 2]`
- /// and `b: [3, 4]`, then the output interval would be `[4, 6]`.
- fn evaluate_bounds(&self, _children: &[&Interval]) -> Result<Interval> {
- not_impl_err!("Not implemented for {self}")
- }
-
- /// Updates bounds for child expressions, given a known interval for this
- /// expression.
- ///
- /// This is used to propagate constraints down through an expression tree.
- ///
- /// # Arguments
- ///
- /// * `interval` is the currently known interval for this expression.
- /// * `children` are the current intervals for the children of this
expression.
- ///
- /// # Returns
- ///
- /// A `Vec` of new intervals for the children, in order.
- ///
- /// If constraint propagation reveals an infeasibility for any child,
returns
- /// [`None`]. If none of the children intervals change as a result of
propagation,
- /// may return an empty vector instead of cloning `children`. This is the
default
- /// (and conservative) return value.
- ///
- /// # Example
- ///
- /// If the expression is `a + b`, the current `interval` is `[4, 5]` and
the
- /// inputs `a` and `b` are respectively given as `[0, 2]` and `[-∞, 4]`,
then
- /// propagation would would return `[0, 2]` and `[2, 4]` as `b` must be at
- /// least `2` to make the output at least `4`.
- fn propagate_constraints(
- &self,
- _interval: &Interval,
- _children: &[&Interval],
- ) -> Result<Option<Vec<Interval>>> {
- Ok(Some(vec![]))
- }
-
- /// Update the hash `state` with this expression requirements from
- /// [`Hash`].
- ///
- /// This method is required to support hashing [`PhysicalExpr`]s. To
- /// implement it, typically the type implementing
- /// [`PhysicalExpr`] implements [`Hash`] and
- /// then the following boiler plate is used:
- ///
- /// # Example:
- /// ```
- /// // User defined expression that derives Hash
- /// #[derive(Hash, Debug, PartialEq, Eq)]
- /// struct MyExpr {
- /// val: u64
- /// }
- ///
- /// // impl PhysicalExpr {
- /// // ...
- /// # impl MyExpr {
- /// // Boiler plate to call the derived Hash impl
- /// fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) {
- /// use std::hash::Hash;
- /// let mut s = state;
- /// self.hash(&mut s);
- /// }
- /// // }
- /// # }
- /// ```
- /// Note: [`PhysicalExpr`] is not constrained by [`Hash`]
- /// directly because it must remain object safe.
- fn dyn_hash(&self, _state: &mut dyn Hasher);
-
- /// The order information of a PhysicalExpr can be estimated from its
children.
- /// This is especially helpful for projection expressions. If we can
ensure that the
- /// order of a PhysicalExpr to project matches with the order of SortExec,
we can
- /// eliminate that SortExecs.
- ///
- /// By recursively calling this function, we can obtain the overall order
- /// information of the PhysicalExpr. Since `SortOptions` cannot fully
handle
- /// the propagation of unordered columns and literals, the `SortProperties`
- /// struct is used.
- fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties {
- SortProperties::Unordered
- }
-}
-
-impl Hash for dyn PhysicalExpr {
- fn hash<H: Hasher>(&self, state: &mut H) {
- self.dyn_hash(state);
- }
-}
+pub use datafusion_physical_expr_common::physical_expr::down_cast_any_ref;
/// Shared [`PhysicalExpr`].
pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;
-/// Returns a copy of this expr if we change any child according to the
pointer comparison.
-/// The size of `children` must be equal to the size of
`PhysicalExpr::children()`.
-pub fn with_new_children_if_necessary(
- expr: Arc<dyn PhysicalExpr>,
- children: Vec<Arc<dyn PhysicalExpr>>,
-) -> Result<Arc<dyn PhysicalExpr>> {
- let old_children = expr.children();
- if children.len() != old_children.len() {
- internal_err!("PhysicalExpr: Wrong number of children")
- } else if children.is_empty()
- || children
- .iter()
- .zip(old_children.iter())
- .any(|(c1, c2)| !Arc::data_ptr_eq(c1, c2))
- {
- Ok(expr.with_new_children(children)?)
- } else {
- Ok(expr)
- }
-}
-
-pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
- if any.is::<Arc<dyn PhysicalExpr>>() {
- any.downcast_ref::<Arc<dyn PhysicalExpr>>()
- .unwrap()
- .as_any()
- } else if any.is::<Box<dyn PhysicalExpr>>() {
- any.downcast_ref::<Box<dyn PhysicalExpr>>()
- .unwrap()
- .as_any()
- } else {
- any
- }
-}
-
/// This function is similar to the `contains` method of `Vec`. It finds
/// whether `expr` is among `physical_exprs`.
pub fn physical_exprs_contains(
diff --git a/datafusion/physical-expr/src/planner.rs
b/datafusion/physical-expr/src/planner.rs
index 0dbea09ffb..44c9d33d60 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -33,6 +33,67 @@ use datafusion_expr::{
};
use std::sync::Arc;
+/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1
+/// AS int)`.
+///
+/// [PhysicalExpr] are the physical counterpart to [Expr] used in logical
+/// planning, and can be evaluated directly on a [RecordBatch]. They are
+/// normally created from [Expr] by a [PhysicalPlanner] and can be created
+/// directly using [create_physical_expr].
+///
+/// A Physical expression knows its type, nullability and how to evaluate
itself.
+///
+/// [PhysicalPlanner]:
https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html
+/// [RecordBatch]:
https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
+///
+/// # Example: Create `PhysicalExpr` from `Expr`
+/// ```
+/// # use arrow::datatypes::{DataType, Field, Schema};
+/// # use datafusion_common::DFSchema;
+/// # use datafusion_expr::{Expr, col, lit};
+/// # use datafusion_physical_expr::create_physical_expr;
+/// # use datafusion_expr::execution_props::ExecutionProps;
+/// // For a logical expression `a = 1`, we can create a physical expression
+/// let expr = col("a").eq(lit(1));
+/// // To create a PhysicalExpr we need 1. a schema
+/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+/// let df_schema = DFSchema::try_from(schema).unwrap();
+/// // 2. ExecutionProps
+/// let props = ExecutionProps::new();
+/// // We can now create a PhysicalExpr:
+/// let physical_expr = create_physical_expr(&expr, &df_schema,
&props).unwrap();
+/// ```
+///
+/// # Example: Executing a PhysicalExpr to obtain [ColumnarValue]
+/// ```
+/// # use std::sync::Arc;
+/// # use arrow::array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch};
+/// # use arrow::datatypes::{DataType, Field, Schema};
+/// # use datafusion_common::{assert_batches_eq, DFSchema};
+/// # use datafusion_expr::{Expr, col, lit, ColumnarValue};
+/// # use datafusion_physical_expr::create_physical_expr;
+/// # use datafusion_expr::execution_props::ExecutionProps;
+/// # let expr = col("a").eq(lit(1));
+/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap();
+/// # let props = ExecutionProps::new();
+/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a
RecordBatch like this:
+/// let physical_expr = create_physical_expr(&expr, &df_schema,
&props).unwrap();
+/// // Input of [1,2,3]
+/// let input_batch = RecordBatch::try_from_iter(vec![
+/// ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _)
+/// ]).unwrap();
+/// // The result is a ColumnarValue (either an Array or a Scalar)
+/// let result = physical_expr.evaluate(&input_batch).unwrap();
+/// // In this case, a BooleanArray with the result of the comparison
+/// let ColumnarValue::Array(arr) = result else {
+/// panic!("Expected an array")
+/// };
+/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false,
false]));
+/// ```
+///
+/// [ColumnarValue]: datafusion_expr::ColumnarValue
+///
/// Create a physical expression from a logical expression ([Expr]).
///
/// # Arguments
diff --git a/datafusion/physical-expr/src/utils/mod.rs
b/datafusion/physical-expr/src/utils/mod.rs
index b8e99403d6..e55bc3d156 100644
--- a/datafusion/physical-expr/src/utils/mod.rs
+++ b/datafusion/physical-expr/src/utils/mod.rs
@@ -24,10 +24,9 @@ use std::sync::Arc;
use crate::expressions::{BinaryExpr, Column};
use crate::tree_node::ExprContext;
-use crate::{PhysicalExpr, PhysicalSortExpr};
+use crate::PhysicalExpr;
+use crate::PhysicalSortExpr;
-use arrow::array::{make_array, Array, ArrayRef, BooleanArray,
MutableArrayData};
-use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
@@ -244,62 +243,6 @@ pub fn reassign_predicate_columns(
.data()
}
-/// Reverses the ORDER BY expression, which is useful during equivalent window
-/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns
into
-/// 'ORDER BY a DESC, NULLS FIRST'.
-pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) ->
Vec<PhysicalSortExpr> {
- order_bys
- .iter()
- .map(|e| PhysicalSortExpr {
- expr: e.expr.clone(),
- options: !e.options,
- })
- .collect()
-}
-
-/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`,
next values of `truthy`
-/// are taken, when the mask evaluates `false` values null values are filled.
-///
-/// # Arguments
-/// * `mask` - Boolean values used to determine where to put the `truthy`
values
-/// * `truthy` - All values of this array are to scatter according to `mask`
into final result.
-pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
- let truthy = truthy.to_data();
-
- // update the mask so that any null values become false
- // (SlicesIterator doesn't respect nulls)
- let mask = and_kleene(mask, &is_not_null(mask)?)?;
-
- let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
-
- // the SlicesIterator slices only the true values. So the gaps left by
this iterator we need to
- // fill with falsy values
-
- // keep track of how much is filled
- let mut filled = 0;
- // keep track of current position we have in truthy array
- let mut true_pos = 0;
-
- SlicesIterator::new(&mask).for_each(|(start, end)| {
- // the gap needs to be filled with nulls
- if start > filled {
- mutable.extend_nulls(start - filled);
- }
- // fill with truthy values
- let len = end - start;
- mutable.extend(0, true_pos, true_pos + len);
- true_pos += len;
- filled = end;
- });
- // the remaining part is falsy
- if filled < mask.len() {
- mutable.extend_nulls(mask.len() - filled);
- }
-
- let data = mutable.freeze();
- Ok(make_array(data))
-}
-
/// Merge left and right sort expressions, checking for duplicates.
pub fn merge_vectors(
left: &[PhysicalSortExpr],
@@ -321,9 +264,7 @@ mod tests {
use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal};
use crate::PhysicalSortExpr;
- use arrow_array::Int32Array;
use arrow_schema::{DataType, Field, Schema};
- use datafusion_common::cast::{as_boolean_array, as_int32_array};
use datafusion_common::{Result, ScalarValue};
use petgraph::visit::Bfs;
@@ -517,70 +458,4 @@ mod tests {
assert_eq!(collect_columns(&expr3), expected);
Ok(())
}
-
- #[test]
- fn scatter_int() -> Result<()> {
- let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
- let mask = BooleanArray::from(vec![true, true, false, false, true]);
-
- // the output array is expected to be the same length as the mask array
- let expected =
- Int32Array::from_iter(vec![Some(1), Some(10), None, None,
Some(11)]);
- let result = scatter(&mask, truthy.as_ref())?;
- let result = as_int32_array(&result)?;
-
- assert_eq!(&expected, result);
- Ok(())
- }
-
- #[test]
- fn scatter_int_end_with_false() -> Result<()> {
- let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
- let mask = BooleanArray::from(vec![true, false, true, false, false,
false]);
-
- // output should be same length as mask
- let expected =
- Int32Array::from_iter(vec![Some(1), None, Some(10), None, None,
None]);
- let result = scatter(&mask, truthy.as_ref())?;
- let result = as_int32_array(&result)?;
-
- assert_eq!(&expected, result);
- Ok(())
- }
-
- #[test]
- fn scatter_with_null_mask() -> Result<()> {
- let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
- let mask: BooleanArray = vec![Some(false), None, Some(true),
Some(true), None]
- .into_iter()
- .collect();
-
- // output should treat nulls as though they are false
- let expected = Int32Array::from_iter(vec![None, None, Some(1),
Some(10), None]);
- let result = scatter(&mask, truthy.as_ref())?;
- let result = as_int32_array(&result)?;
-
- assert_eq!(&expected, result);
- Ok(())
- }
-
- #[test]
- fn scatter_boolean() -> Result<()> {
- let truthy = Arc::new(BooleanArray::from(vec![false, false, false,
true]));
- let mask = BooleanArray::from(vec![true, true, false, false, true]);
-
- // the output array is expected to be the same length as the mask array
- let expected = BooleanArray::from_iter(vec![
- Some(false),
- Some(false),
- None,
- None,
- Some(false),
- ]);
- let result = scatter(&mask, truthy.as_ref())?;
- let result = as_boolean_array(&result)?;
-
- assert_eq!(&expected, result);
- Ok(())
- }
}