This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1dd5354511 Move `AggregateExpr`, `PhysicalExpr` and `PhysicalSortExpr` 
to physical-expr-core (#9926)
1dd5354511 is described below

commit 1dd535451198777693ac962a21b79a680bd4b7e8
Author: Jay Zhan <[email protected]>
AuthorDate: Fri Apr 5 17:57:49 2024 +0800

    Move `AggregateExpr`, `PhysicalExpr` and `PhysicalSortExpr` to 
physical-expr-core (#9926)
    
    * move PhysicalExpr
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * move physical sort
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup dependencies
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * add readme
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * disable doc test
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * move column
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * fmt
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * move aggregatexp
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * move other two utils
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * license
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * switch to ignore
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * move reverse order
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * rename to common
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    ---------
    
    Signed-off-by: jayzhan211 <[email protected]>
---
 Cargo.toml                                         |   2 +
 datafusion-cli/Cargo.lock                          |  10 +
 datafusion/physical-expr-common/Cargo.toml         |  38 ++++
 datafusion/physical-expr-common/README.md          |  27 +++
 .../src/aggregate/mod.rs                           |  50 +---
 .../physical-expr-common/src/aggregate/utils.rs    |  69 ++++++
 .../src/expressions/column.rs                      | 123 +---------
 .../physical-expr-common/src/expressions/mod.rs    |  18 ++
 datafusion/physical-expr-common/src/lib.rs         |  24 ++
 .../physical-expr-common/src/physical_expr.rs      | 211 +++++++++++++++++
 .../src/sort_expr.rs                               |   6 +-
 .../src/sort_properties.rs                         |   4 +-
 .../src/tree_node.rs                               |   0
 datafusion/physical-expr-common/src/utils.rs       | 156 +++++++++++++
 datafusion/physical-expr/Cargo.toml                |   1 +
 datafusion/physical-expr/src/aggregate/mod.rs      |  80 +------
 datafusion/physical-expr/src/aggregate/utils.rs    |  50 +---
 datafusion/physical-expr/src/expressions/column.rs | 106 ---------
 datafusion/physical-expr/src/expressions/mod.rs    |   3 +-
 datafusion/physical-expr/src/lib.rs                |  32 ++-
 datafusion/physical-expr/src/physical_expr.rs      | 251 +--------------------
 datafusion/physical-expr/src/planner.rs            |  61 +++++
 datafusion/physical-expr/src/utils/mod.rs          | 129 +----------
 23 files changed, 663 insertions(+), 788 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 9df489724d..ca34ea9c2a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -26,6 +26,7 @@ members = [
     "datafusion/functions",
     "datafusion/functions-array",
     "datafusion/optimizer",
+    "datafusion/physical-expr-common",
     "datafusion/physical-expr",
     "datafusion/physical-plan",
     "datafusion/proto",
@@ -80,6 +81,7 @@ datafusion-functions = { path = "datafusion/functions", 
version = "37.0.0" }
 datafusion-functions-array = { path = "datafusion/functions-array", version = 
"37.0.0" }
 datafusion-optimizer = { path = "datafusion/optimizer", version = "37.0.0", 
default-features = false }
 datafusion-physical-expr = { path = "datafusion/physical-expr", version = 
"37.0.0", default-features = false }
+datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", 
version = "37.0.0", default-features = false }
 datafusion-physical-plan = { path = "datafusion/physical-plan", version = 
"37.0.0" }
 datafusion-proto = { path = "datafusion/proto", version = "37.0.0" }
 datafusion-sql = { path = "datafusion/sql", version = "37.0.0" }
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 3be92221d3..d744a891c6 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1331,6 +1331,7 @@ dependencies = [
  "datafusion-common",
  "datafusion-execution",
  "datafusion-expr",
+ "datafusion-physical-expr-common",
  "half",
  "hashbrown 0.14.3",
  "hex",
@@ -1345,6 +1346,15 @@ dependencies = [
  "sha2",
 ]
 
+[[package]]
+name = "datafusion-physical-expr-common"
+version = "37.0.0"
+dependencies = [
+ "arrow",
+ "datafusion-common",
+ "datafusion-expr",
+]
+
 [[package]]
 name = "datafusion-physical-plan"
 version = "37.0.0"
diff --git a/datafusion/physical-expr-common/Cargo.toml 
b/datafusion/physical-expr-common/Cargo.toml
new file mode 100644
index 0000000000..89a41a5d10
--- /dev/null
+++ b/datafusion/physical-expr-common/Cargo.toml
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "datafusion-physical-expr-common"
+description = "Common functionality of physical expression for DataFusion 
query engine"
+keywords = ["arrow", "query", "sql"]
+readme = "README.md"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+repository = { workspace = true }
+license = { workspace = true }
+authors = { workspace = true }
+rust-version = { workspace = true }
+
+[lib]
+name = "datafusion_physical_expr_common"
+path = "src/lib.rs"
+
+[dependencies]
+arrow = { workspace = true }
+datafusion-common = { workspace = true, default-features = true }
+datafusion-expr = { workspace = true }
diff --git a/datafusion/physical-expr-common/README.md 
b/datafusion/physical-expr-common/README.md
new file mode 100644
index 0000000000..7a1eff77d3
--- /dev/null
+++ b/datafusion/physical-expr-common/README.md
@@ -0,0 +1,27 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# DataFusion Core Physical Expressions
+
+[DataFusion][df] is an extensible query execution framework, written in Rust, 
that uses Apache Arrow as its in-memory format.
+
+This crate is a submodule of DataFusion that provides shared APIs for 
implementing
+physical expressions such as `PhysicalExpr` and `PhysicalSortExpr`.
+
+[df]: https://crates.io/crates/datafusion
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs 
b/datafusion/physical-expr-common/src/aggregate/mod.rs
similarity index 75%
copy from datafusion/physical-expr/src/aggregate/mod.rs
copy to datafusion/physical-expr-common/src/aggregate/mod.rs
index 893178f29d..579f51815d 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr-common/src/aggregate/mod.rs
@@ -15,53 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub mod utils;
+
 use std::any::Any;
 use std::fmt::Debug;
 use std::sync::Arc;
 
-use crate::expressions::{NthValueAgg, OrderSensitiveArrayAgg};
-use crate::{PhysicalExpr, PhysicalSortExpr};
+use crate::physical_expr::PhysicalExpr;
+use crate::sort_expr::PhysicalSortExpr;
 
 use arrow::datatypes::Field;
 use datafusion_common::{not_impl_err, Result};
 use datafusion_expr::{Accumulator, GroupsAccumulator};
 
-mod hyperloglog;
-mod tdigest;
-
-pub(crate) mod approx_distinct;
-pub(crate) mod approx_median;
-pub(crate) mod approx_percentile_cont;
-pub(crate) mod approx_percentile_cont_with_weight;
-pub(crate) mod array_agg;
-pub(crate) mod array_agg_distinct;
-pub(crate) mod array_agg_ordered;
-pub(crate) mod average;
-pub(crate) mod bit_and_or_xor;
-pub(crate) mod bool_and_or;
-pub(crate) mod correlation;
-pub(crate) mod count;
-pub(crate) mod count_distinct;
-pub(crate) mod covariance;
-pub(crate) mod first_last;
-pub(crate) mod grouping;
-pub(crate) mod median;
-pub(crate) mod nth_value;
-pub(crate) mod string_agg;
-#[macro_use]
-pub(crate) mod min_max;
-pub(crate) mod groups_accumulator;
-pub(crate) mod regr;
-pub(crate) mod stats;
-pub(crate) mod stddev;
-pub(crate) mod sum;
-pub(crate) mod sum_distinct;
-pub(crate) mod variance;
-
-pub mod build_in;
-pub mod moving_min_max;
-pub mod utils;
-
 /// An aggregate expression that:
 /// * knows its resulting field
 /// * knows how to create its accumulator
@@ -134,11 +100,3 @@ pub trait AggregateExpr: Send + Sync + Debug + 
PartialEq<dyn Any> {
         not_impl_err!("Retractable Accumulator hasn't been implemented for 
{self:?} yet")
     }
 }
-
-/// Checks whether the given aggregate expression is order-sensitive.
-/// For instance, a `SUM` aggregation doesn't depend on the order of its 
inputs.
-/// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering.
-pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
-    aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
-        || aggr_expr.as_any().is::<NthValueAgg>()
-}
diff --git a/datafusion/physical-expr-common/src/aggregate/utils.rs 
b/datafusion/physical-expr-common/src/aggregate/utils.rs
new file mode 100644
index 0000000000..9821ba626b
--- /dev/null
+++ b/datafusion/physical-expr-common/src/aggregate/utils.rs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, sync::Arc};
+
+use arrow::{
+    compute::SortOptions,
+    datatypes::{DataType, Field},
+};
+
+use crate::sort_expr::PhysicalSortExpr;
+
+use super::AggregateExpr;
+
+/// Downcast a `Box<dyn AggregateExpr>` or `Arc<dyn AggregateExpr>`
+/// and return the inner trait object as [`Any`] so
+/// that it can be downcast to a specific implementation.
+///
+/// This method is used when implementing the `PartialEq<dyn Any>`
+/// for [`AggregateExpr`] aggregation expressions and allows comparing the 
equality
+/// between the trait objects.
+pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
+    if let Some(obj) = any.downcast_ref::<Arc<dyn AggregateExpr>>() {
+        obj.as_any()
+    } else if let Some(obj) = any.downcast_ref::<Box<dyn AggregateExpr>>() {
+        obj.as_any()
+    } else {
+        any
+    }
+}
+
+/// Construct corresponding fields for lexicographical ordering requirement 
expression
+pub fn ordering_fields(
+    ordering_req: &[PhysicalSortExpr],
+    // Data type of each expression in the ordering requirement
+    data_types: &[DataType],
+) -> Vec<Field> {
+    ordering_req
+        .iter()
+        .zip(data_types.iter())
+        .map(|(sort_expr, dtype)| {
+            Field::new(
+                sort_expr.expr.to_string().as_str(),
+                dtype.clone(),
+                // Multi partitions may be empty hence field should be 
nullable.
+                true,
+            )
+        })
+        .collect()
+}
+
+/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
+pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> 
{
+    ordering_req.iter().map(|item| item.options).collect()
+}
diff --git a/datafusion/physical-expr/src/expressions/column.rs 
b/datafusion/physical-expr-common/src/expressions/column.rs
similarity index 50%
copy from datafusion/physical-expr/src/expressions/column.rs
copy to datafusion/physical-expr-common/src/expressions/column.rs
index a07f36e785..2cd52d6332 100644
--- a/datafusion/physical-expr/src/expressions/column.rs
+++ b/datafusion/physical-expr-common/src/expressions/column.rs
@@ -21,9 +21,6 @@ use std::any::Any;
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
-use crate::physical_expr::down_cast_any_ref;
-use crate::PhysicalExpr;
-
 use arrow::{
     datatypes::{DataType, Schema},
     record_batch::RecordBatch,
@@ -31,6 +28,8 @@ use arrow::{
 use datafusion_common::{internal_err, Result};
 use datafusion_expr::ColumnarValue;
 
+use crate::physical_expr::{down_cast_any_ref, PhysicalExpr};
+
 /// Represents the column at a given index in a RecordBatch
 #[derive(Debug, Hash, PartialEq, Eq, Clone)]
 pub struct Column {
@@ -132,125 +131,7 @@ impl Column {
     }
 }
 
-#[derive(Debug, Hash, PartialEq, Eq, Clone)]
-pub struct UnKnownColumn {
-    name: String,
-}
-
-impl UnKnownColumn {
-    /// Create a new unknown column expression
-    pub fn new(name: &str) -> Self {
-        Self {
-            name: name.to_owned(),
-        }
-    }
-
-    /// Get the column name
-    pub fn name(&self) -> &str {
-        &self.name
-    }
-}
-
-impl std::fmt::Display for UnKnownColumn {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        write!(f, "{}", self.name)
-    }
-}
-
-impl PhysicalExpr for UnKnownColumn {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn std::any::Any {
-        self
-    }
-
-    /// Get the data type of this expression, given the schema of the input
-    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
-        Ok(DataType::Null)
-    }
-
-    /// Decide whehter this expression is nullable, given the schema of the 
input
-    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
-        Ok(true)
-    }
-
-    /// Evaluate the expression
-    fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
-        internal_err!("UnKnownColumn::evaluate() should not be called")
-    }
-
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![]
-    }
-
-    fn with_new_children(
-        self: Arc<Self>,
-        _children: Vec<Arc<dyn PhysicalExpr>>,
-    ) -> Result<Arc<dyn PhysicalExpr>> {
-        Ok(self)
-    }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.hash(&mut s);
-    }
-}
-
-impl PartialEq<dyn Any> for UnKnownColumn {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self == x)
-            .unwrap_or(false)
-    }
-}
-
 /// Create a column expression
 pub fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
     Ok(Arc::new(Column::new_with_schema(name, schema)?))
 }
-
-#[cfg(test)]
-mod test {
-    use crate::expressions::Column;
-    use crate::PhysicalExpr;
-
-    use arrow::array::StringArray;
-    use arrow::datatypes::{DataType, Field, Schema};
-    use arrow::record_batch::RecordBatch;
-    use datafusion_common::Result;
-
-    use std::sync::Arc;
-
-    #[test]
-    fn out_of_bounds_data_type() {
-        let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, 
true)]);
-        let col = Column::new("id", 9);
-        let error = 
col.data_type(&schema).expect_err("error").strip_backtrace();
-        assert!("Internal error: PhysicalExpr Column references column 'id' at 
index 9 (zero-based) \
-            but input schema only has 1 columns: [\"foo\"].\nThis was likely 
caused by a bug in \
-            DataFusion's code and we would welcome that you file an bug report 
in our issue tracker".starts_with(&error))
-    }
-
-    #[test]
-    fn out_of_bounds_nullable() {
-        let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, 
true)]);
-        let col = Column::new("id", 9);
-        let error = 
col.nullable(&schema).expect_err("error").strip_backtrace();
-        assert!("Internal error: PhysicalExpr Column references column 'id' at 
index 9 (zero-based) \
-            but input schema only has 1 columns: [\"foo\"].\nThis was likely 
caused by a bug in \
-            DataFusion's code and we would welcome that you file an bug report 
in our issue tracker".starts_with(&error))
-    }
-
-    #[test]
-    fn out_of_bounds_evaluate() -> Result<()> {
-        let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, 
true)]);
-        let data: StringArray = vec!["data"].into();
-        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(data)])?;
-        let col = Column::new("id", 9);
-        let error = col.evaluate(&batch).expect_err("error").strip_backtrace();
-        assert!("Internal error: PhysicalExpr Column references column 'id' at 
index 9 (zero-based) \
-            but input schema only has 1 columns: [\"foo\"].\nThis was likely 
caused by a bug in \
-            DataFusion's code and we would welcome that you file an bug report 
in our issue tracker".starts_with(&error));
-        Ok(())
-    }
-}
diff --git a/datafusion/physical-expr-common/src/expressions/mod.rs 
b/datafusion/physical-expr-common/src/expressions/mod.rs
new file mode 100644
index 0000000000..d102422081
--- /dev/null
+++ b/datafusion/physical-expr-common/src/expressions/mod.rs
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod column;
diff --git a/datafusion/physical-expr-common/src/lib.rs 
b/datafusion/physical-expr-common/src/lib.rs
new file mode 100644
index 0000000000..53e3134a1b
--- /dev/null
+++ b/datafusion/physical-expr-common/src/lib.rs
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod aggregate;
+pub mod expressions;
+pub mod physical_expr;
+pub mod sort_expr;
+pub mod sort_properties;
+pub mod tree_node;
+pub mod utils;
diff --git a/datafusion/physical-expr-common/src/physical_expr.rs 
b/datafusion/physical-expr-common/src/physical_expr.rs
new file mode 100644
index 0000000000..be6358e73c
--- /dev/null
+++ b/datafusion/physical-expr-common/src/physical_expr.rs
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::fmt::{Debug, Display};
+use std::hash::{Hash, Hasher};
+use std::sync::Arc;
+
+use arrow::array::BooleanArray;
+use arrow::compute::filter_record_batch;
+use arrow::datatypes::{DataType, Schema};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::utils::DataPtr;
+use datafusion_common::{internal_err, not_impl_err, Result};
+use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::ColumnarValue;
+
+use crate::sort_properties::SortProperties;
+use crate::utils::scatter;
+
+/// See 
[create_physical_expr](https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html)
+/// for examples of creating `PhysicalExpr` from `Expr`
+pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq<dyn Any> {
+    /// Returns the physical expression as [`Any`] so that it can be
+    /// downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+    /// Get the data type of this expression, given the schema of the input
+    fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
+    /// Determine whether this expression is nullable, given the schema of the 
input
+    fn nullable(&self, input_schema: &Schema) -> Result<bool>;
+    /// Evaluate an expression against a RecordBatch
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+    /// Evaluate an expression against a RecordBatch after first applying a
+    /// validity array
+    fn evaluate_selection(
+        &self,
+        batch: &RecordBatch,
+        selection: &BooleanArray,
+    ) -> Result<ColumnarValue> {
+        let tmp_batch = filter_record_batch(batch, selection)?;
+
+        let tmp_result = self.evaluate(&tmp_batch)?;
+
+        if batch.num_rows() == tmp_batch.num_rows() {
+            // All values from the `selection` filter are true.
+            Ok(tmp_result)
+        } else if let ColumnarValue::Array(a) = tmp_result {
+            scatter(selection, a.as_ref()).map(ColumnarValue::Array)
+        } else {
+            Ok(tmp_result)
+        }
+    }
+
+    /// Get a list of child PhysicalExpr that provide the input for this expr.
+    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+
+    /// Returns a new PhysicalExpr where all children were replaced by new 
exprs.
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Result<Arc<dyn PhysicalExpr>>;
+
+    /// Computes the output interval for the expression, given the input
+    /// intervals.
+    ///
+    /// # Arguments
+    ///
+    /// * `children` are the intervals for the children (inputs) of this
+    /// expression.
+    ///
+    /// # Example
+    ///
+    /// If the expression is `a + b`, and the input intervals are `a: [1, 2]`
+    /// and `b: [3, 4]`, then the output interval would be `[4, 6]`.
+    fn evaluate_bounds(&self, _children: &[&Interval]) -> Result<Interval> {
+        not_impl_err!("Not implemented for {self}")
+    }
+
+    /// Updates bounds for child expressions, given a known interval for this
+    /// expression.
+    ///
+    /// This is used to propagate constraints down through an expression tree.
+    ///
+    /// # Arguments
+    ///
+    /// * `interval` is the currently known interval for this expression.
+    /// * `children` are the current intervals for the children of this 
expression.
+    ///
+    /// # Returns
+    ///
+    /// A `Vec` of new intervals for the children, in order.
+    ///
+    /// If constraint propagation reveals an infeasibility for any child, 
returns
+    /// [`None`]. If none of the children intervals change as a result of 
propagation,
+    /// may return an empty vector instead of cloning `children`. This is the 
default
+    /// (and conservative) return value.
+    ///
+    /// # Example
+    ///
+    /// If the expression is `a + b`, the current `interval` is `[4, 5]` and 
the
+    /// inputs `a` and `b` are respectively given as `[0, 2]` and `[-∞, 4]`, 
then
+    /// propagation would would return `[0, 2]` and `[2, 4]` as `b` must be at
+    /// least `2` to make the output at least `4`.
+    fn propagate_constraints(
+        &self,
+        _interval: &Interval,
+        _children: &[&Interval],
+    ) -> Result<Option<Vec<Interval>>> {
+        Ok(Some(vec![]))
+    }
+
+    /// Update the hash `state` with this expression requirements from
+    /// [`Hash`].
+    ///
+    /// This method is required to support hashing [`PhysicalExpr`]s.  To
+    /// implement it, typically the type implementing
+    /// [`PhysicalExpr`] implements [`Hash`] and
+    /// then the following boiler plate is used:
+    ///
+    /// # Example:
+    /// ```
+    /// // User defined expression that derives Hash
+    /// #[derive(Hash, Debug, PartialEq, Eq)]
+    /// struct MyExpr {
+    ///   val: u64
+    /// }
+    ///
+    /// // impl PhysicalExpr {
+    /// // ...
+    /// # impl MyExpr {
+    ///   // Boiler plate to call the derived Hash impl
+    ///   fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) {
+    ///     use std::hash::Hash;
+    ///     let mut s = state;
+    ///     self.hash(&mut s);
+    ///   }
+    /// // }
+    /// # }
+    /// ```
+    /// Note: [`PhysicalExpr`] is not constrained by [`Hash`]
+    /// directly because it must remain object safe.
+    fn dyn_hash(&self, _state: &mut dyn Hasher);
+
+    /// The order information of a PhysicalExpr can be estimated from its 
children.
+    /// This is especially helpful for projection expressions. If we can 
ensure that the
+    /// order of a PhysicalExpr to project matches with the order of SortExec, 
we can
+    /// eliminate that SortExecs.
+    ///
+    /// By recursively calling this function, we can obtain the overall order
+    /// information of the PhysicalExpr. Since `SortOptions` cannot fully 
handle
+    /// the propagation of unordered columns and literals, the `SortProperties`
+    /// struct is used.
+    fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties {
+        SortProperties::Unordered
+    }
+}
+
+impl Hash for dyn PhysicalExpr {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.dyn_hash(state);
+    }
+}
+
+/// Returns a copy of this expr if we change any child according to the 
pointer comparison.
+/// The size of `children` must be equal to the size of 
`PhysicalExpr::children()`.
+pub fn with_new_children_if_necessary(
+    expr: Arc<dyn PhysicalExpr>,
+    children: Vec<Arc<dyn PhysicalExpr>>,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    let old_children = expr.children();
+    if children.len() != old_children.len() {
+        internal_err!("PhysicalExpr: Wrong number of children")
+    } else if children.is_empty()
+        || children
+            .iter()
+            .zip(old_children.iter())
+            .any(|(c1, c2)| !Arc::data_ptr_eq(c1, c2))
+    {
+        Ok(expr.with_new_children(children)?)
+    } else {
+        Ok(expr)
+    }
+}
+
+pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
+    if any.is::<Arc<dyn PhysicalExpr>>() {
+        any.downcast_ref::<Arc<dyn PhysicalExpr>>()
+            .unwrap()
+            .as_any()
+    } else if any.is::<Box<dyn PhysicalExpr>>() {
+        any.downcast_ref::<Box<dyn PhysicalExpr>>()
+            .unwrap()
+            .as_any()
+    } else {
+        any
+    }
+}
diff --git a/datafusion/physical-expr/src/sort_expr.rs 
b/datafusion/physical-expr-common/src/sort_expr.rs
similarity index 99%
rename from datafusion/physical-expr/src/sort_expr.rs
rename to datafusion/physical-expr-common/src/sort_expr.rs
index 914d76f926..1e1187212d 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr-common/src/sort_expr.rs
@@ -21,14 +21,14 @@ use std::fmt::Display;
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
-use crate::PhysicalExpr;
-
 use arrow::compute::kernels::sort::{SortColumn, SortOptions};
+use arrow::datatypes::Schema;
 use arrow::record_batch::RecordBatch;
-use arrow_schema::Schema;
 use datafusion_common::Result;
 use datafusion_expr::ColumnarValue;
 
+use crate::physical_expr::PhysicalExpr;
+
 /// Represents Sort operation for a column in a RecordBatch
 #[derive(Clone, Debug)]
 pub struct PhysicalSortExpr {
diff --git a/datafusion/physical-expr/src/sort_properties.rs 
b/datafusion/physical-expr-common/src/sort_properties.rs
similarity index 99%
rename from datafusion/physical-expr/src/sort_properties.rs
rename to datafusion/physical-expr-common/src/sort_properties.rs
index 4df29ced2f..47a5d5ba5e 100644
--- a/datafusion/physical-expr/src/sort_properties.rs
+++ b/datafusion/physical-expr-common/src/sort_properties.rs
@@ -17,9 +17,9 @@
 
 use std::ops::Neg;
 
-use crate::tree_node::ExprContext;
+use arrow::compute::SortOptions;
 
-use arrow_schema::SortOptions;
+use crate::tree_node::ExprContext;
 
 /// To propagate [`SortOptions`] across the `PhysicalExpr`, it is insufficient
 /// to simply use `Option<SortOptions>`: There must be a differentiation 
between
diff --git a/datafusion/physical-expr/src/tree_node.rs 
b/datafusion/physical-expr-common/src/tree_node.rs
similarity index 100%
rename from datafusion/physical-expr/src/tree_node.rs
rename to datafusion/physical-expr-common/src/tree_node.rs
diff --git a/datafusion/physical-expr-common/src/utils.rs 
b/datafusion/physical-expr-common/src/utils.rs
new file mode 100644
index 0000000000..459b5a4849
--- /dev/null
+++ b/datafusion/physical-expr-common/src/utils.rs
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::{
+    array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData},
+    compute::{and_kleene, is_not_null, SlicesIterator},
+};
+use datafusion_common::Result;
+
+use crate::sort_expr::PhysicalSortExpr;
+
+/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, 
next values of `truthy`
+/// are taken, when the mask evaluates `false` values null values are filled.
+///
+/// # Arguments
+/// * `mask` - Boolean values used to determine where to put the `truthy` 
values
+/// * `truthy` - All values of this array are to scatter according to `mask` 
into final result.
+pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
+    let truthy = truthy.to_data();
+
+    // update the mask so that any null values become false
+    // (SlicesIterator doesn't respect nulls)
+    let mask = and_kleene(mask, &is_not_null(mask)?)?;
+
+    let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
+
+    // the SlicesIterator slices only the true values. So the gaps left by 
this iterator we need to
+    // fill with falsy values
+
+    // keep track of how much is filled
+    let mut filled = 0;
+    // keep track of current position we have in truthy array
+    let mut true_pos = 0;
+
+    SlicesIterator::new(&mask).for_each(|(start, end)| {
+        // the gap needs to be filled with nulls
+        if start > filled {
+            mutable.extend_nulls(start - filled);
+        }
+        // fill with truthy values
+        let len = end - start;
+        mutable.extend(0, true_pos, true_pos + len);
+        true_pos += len;
+        filled = end;
+    });
+    // the remaining part is falsy
+    if filled < mask.len() {
+        mutable.extend_nulls(mask.len() - filled);
+    }
+
+    let data = mutable.freeze();
+    Ok(make_array(data))
+}
+
+/// Reverses the ORDER BY expression, which is useful during equivalent window
+/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns 
into
+/// 'ORDER BY a DESC, NULLS FIRST'.
+pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> 
Vec<PhysicalSortExpr> {
+    order_bys
+        .iter()
+        .map(|e| PhysicalSortExpr {
+            expr: e.expr.clone(),
+            options: !e.options,
+        })
+        .collect()
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use arrow::array::Int32Array;
+    use datafusion_common::cast::{as_boolean_array, as_int32_array};
+
+    use super::*;
+
+    #[test]
+    fn scatter_int() -> Result<()> {
+        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
+        let mask = BooleanArray::from(vec![true, true, false, false, true]);
+
+        // the output array is expected to be the same length as the mask array
+        let expected =
+            Int32Array::from_iter(vec![Some(1), Some(10), None, None, 
Some(11)]);
+        let result = scatter(&mask, truthy.as_ref())?;
+        let result = as_int32_array(&result)?;
+
+        assert_eq!(&expected, result);
+        Ok(())
+    }
+
+    #[test]
+    fn scatter_int_end_with_false() -> Result<()> {
+        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
+        let mask = BooleanArray::from(vec![true, false, true, false, false, 
false]);
+
+        // output should be same length as mask
+        let expected =
+            Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, 
None]);
+        let result = scatter(&mask, truthy.as_ref())?;
+        let result = as_int32_array(&result)?;
+
+        assert_eq!(&expected, result);
+        Ok(())
+    }
+
+    #[test]
+    fn scatter_with_null_mask() -> Result<()> {
+        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
+        let mask: BooleanArray = vec![Some(false), None, Some(true), 
Some(true), None]
+            .into_iter()
+            .collect();
+
+        // output should treat nulls as though they are false
+        let expected = Int32Array::from_iter(vec![None, None, Some(1), 
Some(10), None]);
+        let result = scatter(&mask, truthy.as_ref())?;
+        let result = as_int32_array(&result)?;
+
+        assert_eq!(&expected, result);
+        Ok(())
+    }
+
+    #[test]
+    fn scatter_boolean() -> Result<()> {
+        let truthy = Arc::new(BooleanArray::from(vec![false, false, false, 
true]));
+        let mask = BooleanArray::from(vec![true, true, false, false, true]);
+
+        // the output array is expected to be the same length as the mask array
+        let expected = BooleanArray::from_iter(vec![
+            Some(false),
+            Some(false),
+            None,
+            None,
+            Some(false),
+        ]);
+        let result = scatter(&mask, truthy.as_ref())?;
+        let result = as_boolean_array(&result)?;
+
+        assert_eq!(&expected, result);
+        Ok(())
+    }
+}
diff --git a/datafusion/physical-expr/Cargo.toml 
b/datafusion/physical-expr/Cargo.toml
index 56b3f3c91e..87d73183d0 100644
--- a/datafusion/physical-expr/Cargo.toml
+++ b/datafusion/physical-expr/Cargo.toml
@@ -59,6 +59,7 @@ chrono = { workspace = true }
 datafusion-common = { workspace = true, default-features = true }
 datafusion-execution = { workspace = true }
 datafusion-expr = { workspace = true }
+datafusion-physical-expr-common = { workspace = true }
 half = { workspace = true }
 hashbrown = { version = "0.14", features = ["raw"] }
 hex = { version = "0.4", optional = true }
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs 
b/datafusion/physical-expr/src/aggregate/mod.rs
index 893178f29d..e176084ae6 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -15,16 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::any::Any;
-use std::fmt::Debug;
 use std::sync::Arc;
 
 use crate::expressions::{NthValueAgg, OrderSensitiveArrayAgg};
-use crate::{PhysicalExpr, PhysicalSortExpr};
 
-use arrow::datatypes::Field;
-use datafusion_common::{not_impl_err, Result};
-use datafusion_expr::{Accumulator, GroupsAccumulator};
+pub use datafusion_physical_expr_common::aggregate::AggregateExpr;
 
 mod hyperloglog;
 mod tdigest;
@@ -62,79 +57,6 @@ pub mod build_in;
 pub mod moving_min_max;
 pub mod utils;
 
-/// An aggregate expression that:
-/// * knows its resulting field
-/// * knows how to create its accumulator
-/// * knows its accumulator's state's field
-/// * knows the expressions from whose its accumulator will receive values
-///
-/// Any implementation of this trait also needs to implement the
-/// `PartialEq<dyn Any>` to allows comparing equality between the
-/// trait objects.
-pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
-    /// Returns the aggregate expression as [`Any`] so that it can be
-    /// downcast to a specific implementation.
-    fn as_any(&self) -> &dyn Any;
-
-    /// the field of the final result of this aggregation.
-    fn field(&self) -> Result<Field>;
-
-    /// the accumulator used to accumulate values from the expressions.
-    /// the accumulator expects the same number of arguments as `expressions` 
and must
-    /// return states with the same description as `state_fields`
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>>;
-
-    /// the fields that encapsulate the Accumulator's state
-    /// the number of fields here equals the number of states that the 
accumulator contains
-    fn state_fields(&self) -> Result<Vec<Field>>;
-
-    /// expressions that are passed to the Accumulator.
-    /// Single-column aggregations such as `sum` return a single value, others 
(e.g. `cov`) return many.
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
-
-    /// Order by requirements for the aggregate function
-    /// By default it is `None` (there is no requirement)
-    /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` 
should implement this
-    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    /// Human readable name such as `"MIN(c2)"`. The default
-    /// implementation returns placeholder text.
-    fn name(&self) -> &str {
-        "AggregateExpr: default name"
-    }
-
-    /// If the aggregate expression has a specialized
-    /// [`GroupsAccumulator`] implementation. If this returns true,
-    /// `[Self::create_groups_accumulator`] will be called.
-    fn groups_accumulator_supported(&self) -> bool {
-        false
-    }
-
-    /// Return a specialized [`GroupsAccumulator`] that manages state
-    /// for all groups.
-    ///
-    /// For maximum performance, a [`GroupsAccumulator`] should be
-    /// implemented in addition to [`Accumulator`].
-    fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
-        not_impl_err!("GroupsAccumulator hasn't been implemented for {self:?} 
yet")
-    }
-
-    /// Construct an expression that calculates the aggregate in reverse.
-    /// Typically the "reverse" expression is itself (e.g. SUM, COUNT).
-    /// For aggregates that do not support calculation in reverse,
-    /// returns None (which is the default value).
-    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
-        None
-    }
-
-    /// Creates accumulator implementation that supports retract
-    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        not_impl_err!("Retractable Accumulator hasn't been implemented for 
{self:?} yet")
-    }
-}
-
 /// Checks whether the given aggregate expression is order-sensitive.
 /// For instance, a `SUM` aggregation doesn't depend on the order of its 
inputs.
 /// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering.
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs 
b/datafusion/physical-expr/src/aggregate/utils.rs
index 613f6118e9..d14a52f575 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -17,10 +17,12 @@
 
 //! Utilities used in aggregates
 
-use std::any::Any;
 use std::sync::Arc;
 
-use crate::{AggregateExpr, PhysicalSortExpr};
+// For backwards compatibility
+pub use datafusion_physical_expr_common::aggregate::utils::down_cast_any_ref;
+pub use datafusion_physical_expr_common::aggregate::utils::get_sort_options;
+pub use datafusion_physical_expr_common::aggregate::utils::ordering_fields;
 
 use arrow::array::{ArrayRef, ArrowNativeTypeOp};
 use arrow_array::cast::AsArray;
@@ -29,7 +31,7 @@ use arrow_array::types::{
     TimestampNanosecondType, TimestampSecondType,
 };
 use arrow_buffer::{ArrowNativeType, ToByteSlice};
-use arrow_schema::{DataType, Field, SortOptions};
+use arrow_schema::DataType;
 use datafusion_common::{exec_err, DataFusionError, Result};
 use datafusion_expr::Accumulator;
 
@@ -170,48 +172,6 @@ pub fn adjust_output_array(
     Ok(array)
 }
 
-/// Downcast a `Box<dyn AggregateExpr>` or `Arc<dyn AggregateExpr>`
-/// and return the inner trait object as [`Any`] so
-/// that it can be downcast to a specific implementation.
-///
-/// This method is used when implementing the `PartialEq<dyn Any>`
-/// for [`AggregateExpr`] aggregation expressions and allows comparing the 
equality
-/// between the trait objects.
-pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
-    if let Some(obj) = any.downcast_ref::<Arc<dyn AggregateExpr>>() {
-        obj.as_any()
-    } else if let Some(obj) = any.downcast_ref::<Box<dyn AggregateExpr>>() {
-        obj.as_any()
-    } else {
-        any
-    }
-}
-
-/// Construct corresponding fields for lexicographical ordering requirement 
expression
-pub fn ordering_fields(
-    ordering_req: &[PhysicalSortExpr],
-    // Data type of each expression in the ordering requirement
-    data_types: &[DataType],
-) -> Vec<Field> {
-    ordering_req
-        .iter()
-        .zip(data_types.iter())
-        .map(|(sort_expr, dtype)| {
-            Field::new(
-                sort_expr.expr.to_string().as_str(),
-                dtype.clone(),
-                // Multi partitions may be empty hence field should be 
nullable.
-                true,
-            )
-        })
-        .collect()
-}
-
-/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
-pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> 
{
-    ordering_req.iter().map(|item| item.options).collect()
-}
-
 /// A wrapper around a type to provide hash for floats
 #[derive(Copy, Clone, Debug)]
 pub(crate) struct Hashable<T>(pub T);
diff --git a/datafusion/physical-expr/src/expressions/column.rs 
b/datafusion/physical-expr/src/expressions/column.rs
index a07f36e785..634a56d1d6 100644
--- a/datafusion/physical-expr/src/expressions/column.rs
+++ b/datafusion/physical-expr/src/expressions/column.rs
@@ -31,107 +31,6 @@ use arrow::{
 use datafusion_common::{internal_err, Result};
 use datafusion_expr::ColumnarValue;
 
-/// Represents the column at a given index in a RecordBatch
-#[derive(Debug, Hash, PartialEq, Eq, Clone)]
-pub struct Column {
-    name: String,
-    index: usize,
-}
-
-impl Column {
-    /// Create a new column expression
-    pub fn new(name: &str, index: usize) -> Self {
-        Self {
-            name: name.to_owned(),
-            index,
-        }
-    }
-
-    /// Create a new column expression based on column name and schema
-    pub fn new_with_schema(name: &str, schema: &Schema) -> Result<Self> {
-        Ok(Column::new(name, schema.index_of(name)?))
-    }
-
-    /// Get the column name
-    pub fn name(&self) -> &str {
-        &self.name
-    }
-
-    /// Get the column index
-    pub fn index(&self) -> usize {
-        self.index
-    }
-}
-
-impl std::fmt::Display for Column {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        write!(f, "{}@{}", self.name, self.index)
-    }
-}
-
-impl PhysicalExpr for Column {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn std::any::Any {
-        self
-    }
-
-    /// Get the data type of this expression, given the schema of the input
-    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        self.bounds_check(input_schema)?;
-        Ok(input_schema.field(self.index).data_type().clone())
-    }
-
-    /// Decide whehter this expression is nullable, given the schema of the 
input
-    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
-        self.bounds_check(input_schema)?;
-        Ok(input_schema.field(self.index).is_nullable())
-    }
-
-    /// Evaluate the expression
-    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
-        self.bounds_check(batch.schema().as_ref())?;
-        Ok(ColumnarValue::Array(batch.column(self.index).clone()))
-    }
-
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![]
-    }
-
-    fn with_new_children(
-        self: Arc<Self>,
-        _children: Vec<Arc<dyn PhysicalExpr>>,
-    ) -> Result<Arc<dyn PhysicalExpr>> {
-        Ok(self)
-    }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.hash(&mut s);
-    }
-}
-
-impl PartialEq<dyn Any> for Column {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self == x)
-            .unwrap_or(false)
-    }
-}
-
-impl Column {
-    fn bounds_check(&self, input_schema: &Schema) -> Result<()> {
-        if self.index < input_schema.fields.len() {
-            Ok(())
-        } else {
-            internal_err!(
-                "PhysicalExpr Column references column '{}' at index {} 
(zero-based) but input schema only has {} columns: {:?}",
-                self.name,
-                self.index, input_schema.fields.len(), 
input_schema.fields().iter().map(|f| f.name().clone()).collect::<Vec<String>>())
-        }
-    }
-}
-
 #[derive(Debug, Hash, PartialEq, Eq, Clone)]
 pub struct UnKnownColumn {
     name: String,
@@ -204,11 +103,6 @@ impl PartialEq<dyn Any> for UnKnownColumn {
     }
 }
 
-/// Create a column expression
-pub fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
-    Ok(Arc::new(Column::new_with_schema(name, schema)?))
-}
-
 #[cfg(test)]
 mod test {
     use crate::expressions::Column;
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index 7c4ea07dfb..f0cc4b175e 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -80,7 +80,8 @@ pub use crate::PhysicalSortExpr;
 pub use binary::{binary, BinaryExpr};
 pub use case::{case, CaseExpr};
 pub use cast::{cast, cast_with_options, CastExpr};
-pub use column::{col, Column, UnKnownColumn};
+pub use column::UnKnownColumn;
+pub use datafusion_physical_expr_common::expressions::column::{col, Column};
 pub use in_list::{in_list, InListExpr};
 pub use is_not_null::{is_not_null, IsNotNullExpr};
 pub use is_null::{is_null, IsNullExpr};
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index 655771270a..c88f1b32bb 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -28,10 +28,7 @@ mod partitioning;
 mod physical_expr;
 pub mod planner;
 mod scalar_function;
-mod sort_expr;
-pub mod sort_properties;
 pub mod string_expressions;
-pub mod tree_node;
 pub mod udf;
 pub mod utils;
 pub mod window;
@@ -43,20 +40,37 @@ pub mod execution_props {
 }
 
 pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState};
-pub use aggregate::AggregateExpr;
 pub use analysis::{analyze, AnalysisContext, ExprBoundaries};
+pub use datafusion_physical_expr_common::aggregate::AggregateExpr;
 pub use equivalence::EquivalenceProperties;
 pub use partitioning::{Distribution, Partitioning};
 pub use physical_expr::{
     physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
-    PhysicalExpr, PhysicalExprRef,
+    PhysicalExprRef,
 };
-pub use planner::{create_physical_expr, create_physical_exprs};
-pub use scalar_function::ScalarFunctionExpr;
-pub use sort_expr::{
+
+pub use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+pub use datafusion_physical_expr_common::sort_expr::{
     LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, 
PhysicalSortExpr,
     PhysicalSortRequirement,
 };
-pub use utils::{reverse_order_bys, split_conjunction};
+
+pub use planner::{create_physical_expr, create_physical_exprs};
+pub use scalar_function::ScalarFunctionExpr;
+
+pub use datafusion_physical_expr_common::utils::reverse_order_bys;
+pub use utils::split_conjunction;
 
 pub use aggregate::first_last::create_first_value_accumulator;
+
+// For backwards compatibility
+pub mod sort_properties {
+    pub use datafusion_physical_expr_common::sort_properties::{
+        ExprOrdering, SortProperties,
+    };
+}
+
+// For backwards compatibility
+pub mod tree_node {
+    pub use datafusion_physical_expr_common::tree_node::ExprContext;
+}
diff --git a/datafusion/physical-expr/src/physical_expr.rs 
b/datafusion/physical-expr/src/physical_expr.rs
index 861a4ad028..bc265d3819 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -15,263 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::any::Any;
-use std::fmt::{Debug, Display};
-use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
-use crate::sort_properties::SortProperties;
-use crate::utils::scatter;
-
-use arrow::array::BooleanArray;
-use arrow::compute::filter_record_batch;
-use arrow::datatypes::{DataType, Schema};
-use arrow::record_batch::RecordBatch;
-use datafusion_common::utils::DataPtr;
-use datafusion_common::{internal_err, not_impl_err, Result};
-use datafusion_expr::interval_arithmetic::Interval;
-use datafusion_expr::ColumnarValue;
-
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 use itertools::izip;
 
-/// `PhysicalExpr` evaluate DataFusion expressions such as `A + 1`, or `CAST(c1
-/// AS int)`.
-///
-/// `PhysicalExpr` are the physical counterpart to [`Expr`] used in logical
-/// planning, and can be evaluated directly on a [`RecordBatch`]. They are
-/// normally created from `Expr` by a [`PhysicalPlanner`] and can be created
-/// directly using [`create_physical_expr`].
-///
-/// A Physical expression knows its type, nullability and how to evaluate 
itself.
-///
-/// [`PhysicalPlanner`]: 
https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html
-/// [`create_physical_expr`]: crate::create_physical_expr
-/// [`Expr`]: datafusion_expr::Expr
-///
-/// # Example: Create `PhysicalExpr` from `Expr`
-/// ```
-/// # use arrow_schema::{DataType, Field, Schema};
-/// # use datafusion_common::DFSchema;
-/// # use datafusion_expr::{Expr, col, lit};
-/// # use datafusion_physical_expr::create_physical_expr;
-/// # use datafusion_expr::execution_props::ExecutionProps;
-/// // For a logical expression `a = 1`, we can create a physical expression
-/// let expr = col("a").eq(lit(1));
-/// // To create a PhysicalExpr we need 1. a schema
-/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
-/// let df_schema = DFSchema::try_from(schema).unwrap();
-/// // 2. ExecutionProps
-/// let props = ExecutionProps::new();
-/// // We can now create a PhysicalExpr:
-/// let physical_expr = create_physical_expr(&expr, &df_schema, 
&props).unwrap();
-/// ```
-///
-/// # Example: Executing a PhysicalExpr to obtain [`ColumnarValue`]
-/// ```
-/// # use std::sync::Arc;
-/// # use arrow_array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch};
-/// # use arrow_schema::{DataType, Field, Schema};
-/// # use datafusion_common::{assert_batches_eq, DFSchema};
-/// # use datafusion_expr::{Expr, col, lit, ColumnarValue};
-/// # use datafusion_physical_expr::create_physical_expr;
-/// # use datafusion_expr::execution_props::ExecutionProps;
-/// # let expr = col("a").eq(lit(1));
-/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
-/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap();
-/// # let props = ExecutionProps::new();
-/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a 
RecordBatch like this:
-/// let physical_expr = create_physical_expr(&expr, &df_schema, 
&props).unwrap();
-/// // Input of [1,2,3]
-/// let input_batch = RecordBatch::try_from_iter(vec![
-///   ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _)
-/// ]).unwrap();
-/// // The result is a ColumnarValue (either an Array or a Scalar)
-/// let result = physical_expr.evaluate(&input_batch).unwrap();
-/// // In this case, a BooleanArray with the result of the comparison
-/// let ColumnarValue::Array(arr) = result else {
-///  panic!("Expected an array")
-/// };
-/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false, 
false]));
-/// ```
-pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq<dyn Any> {
-    /// Returns the physical expression as [`Any`] so that it can be
-    /// downcast to a specific implementation.
-    fn as_any(&self) -> &dyn Any;
-    /// Get the data type of this expression, given the schema of the input
-    fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
-    /// Determine whether this expression is nullable, given the schema of the 
input
-    fn nullable(&self, input_schema: &Schema) -> Result<bool>;
-    /// Evaluate an expression against a RecordBatch
-    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
-    /// Evaluate an expression against a RecordBatch after first applying a
-    /// validity array
-    fn evaluate_selection(
-        &self,
-        batch: &RecordBatch,
-        selection: &BooleanArray,
-    ) -> Result<ColumnarValue> {
-        let tmp_batch = filter_record_batch(batch, selection)?;
-
-        let tmp_result = self.evaluate(&tmp_batch)?;
-
-        if batch.num_rows() == tmp_batch.num_rows() {
-            // All values from the `selection` filter are true.
-            Ok(tmp_result)
-        } else if let ColumnarValue::Array(a) = tmp_result {
-            scatter(selection, a.as_ref()).map(ColumnarValue::Array)
-        } else {
-            Ok(tmp_result)
-        }
-    }
-
-    /// Get a list of child PhysicalExpr that provide the input for this expr.
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>>;
-
-    /// Returns a new PhysicalExpr where all children were replaced by new 
exprs.
-    fn with_new_children(
-        self: Arc<Self>,
-        children: Vec<Arc<dyn PhysicalExpr>>,
-    ) -> Result<Arc<dyn PhysicalExpr>>;
-
-    /// Computes the output interval for the expression, given the input
-    /// intervals.
-    ///
-    /// # Arguments
-    ///
-    /// * `children` are the intervals for the children (inputs) of this
-    /// expression.
-    ///
-    /// # Example
-    ///
-    /// If the expression is `a + b`, and the input intervals are `a: [1, 2]`
-    /// and `b: [3, 4]`, then the output interval would be `[4, 6]`.
-    fn evaluate_bounds(&self, _children: &[&Interval]) -> Result<Interval> {
-        not_impl_err!("Not implemented for {self}")
-    }
-
-    /// Updates bounds for child expressions, given a known interval for this
-    /// expression.
-    ///
-    /// This is used to propagate constraints down through an expression tree.
-    ///
-    /// # Arguments
-    ///
-    /// * `interval` is the currently known interval for this expression.
-    /// * `children` are the current intervals for the children of this 
expression.
-    ///
-    /// # Returns
-    ///
-    /// A `Vec` of new intervals for the children, in order.
-    ///
-    /// If constraint propagation reveals an infeasibility for any child, 
returns
-    /// [`None`]. If none of the children intervals change as a result of 
propagation,
-    /// may return an empty vector instead of cloning `children`. This is the 
default
-    /// (and conservative) return value.
-    ///
-    /// # Example
-    ///
-    /// If the expression is `a + b`, the current `interval` is `[4, 5]` and 
the
-    /// inputs `a` and `b` are respectively given as `[0, 2]` and `[-∞, 4]`, 
then
-    /// propagation would would return `[0, 2]` and `[2, 4]` as `b` must be at
-    /// least `2` to make the output at least `4`.
-    fn propagate_constraints(
-        &self,
-        _interval: &Interval,
-        _children: &[&Interval],
-    ) -> Result<Option<Vec<Interval>>> {
-        Ok(Some(vec![]))
-    }
-
-    /// Update the hash `state` with this expression requirements from
-    /// [`Hash`].
-    ///
-    /// This method is required to support hashing [`PhysicalExpr`]s.  To
-    /// implement it, typically the type implementing
-    /// [`PhysicalExpr`] implements [`Hash`] and
-    /// then the following boiler plate is used:
-    ///
-    /// # Example:
-    /// ```
-    /// // User defined expression that derives Hash
-    /// #[derive(Hash, Debug, PartialEq, Eq)]
-    /// struct MyExpr {
-    ///   val: u64
-    /// }
-    ///
-    /// // impl PhysicalExpr {
-    /// // ...
-    /// # impl MyExpr {
-    ///   // Boiler plate to call the derived Hash impl
-    ///   fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) {
-    ///     use std::hash::Hash;
-    ///     let mut s = state;
-    ///     self.hash(&mut s);
-    ///   }
-    /// // }
-    /// # }
-    /// ```
-    /// Note: [`PhysicalExpr`] is not constrained by [`Hash`]
-    /// directly because it must remain object safe.
-    fn dyn_hash(&self, _state: &mut dyn Hasher);
-
-    /// The order information of a PhysicalExpr can be estimated from its 
children.
-    /// This is especially helpful for projection expressions. If we can 
ensure that the
-    /// order of a PhysicalExpr to project matches with the order of SortExec, 
we can
-    /// eliminate that SortExecs.
-    ///
-    /// By recursively calling this function, we can obtain the overall order
-    /// information of the PhysicalExpr. Since `SortOptions` cannot fully 
handle
-    /// the propagation of unordered columns and literals, the `SortProperties`
-    /// struct is used.
-    fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties {
-        SortProperties::Unordered
-    }
-}
-
-impl Hash for dyn PhysicalExpr {
-    fn hash<H: Hasher>(&self, state: &mut H) {
-        self.dyn_hash(state);
-    }
-}
+pub use datafusion_physical_expr_common::physical_expr::down_cast_any_ref;
 
 /// Shared [`PhysicalExpr`].
 pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;
 
-/// Returns a copy of this expr if we change any child according to the 
pointer comparison.
-/// The size of `children` must be equal to the size of 
`PhysicalExpr::children()`.
-pub fn with_new_children_if_necessary(
-    expr: Arc<dyn PhysicalExpr>,
-    children: Vec<Arc<dyn PhysicalExpr>>,
-) -> Result<Arc<dyn PhysicalExpr>> {
-    let old_children = expr.children();
-    if children.len() != old_children.len() {
-        internal_err!("PhysicalExpr: Wrong number of children")
-    } else if children.is_empty()
-        || children
-            .iter()
-            .zip(old_children.iter())
-            .any(|(c1, c2)| !Arc::data_ptr_eq(c1, c2))
-    {
-        Ok(expr.with_new_children(children)?)
-    } else {
-        Ok(expr)
-    }
-}
-
-pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
-    if any.is::<Arc<dyn PhysicalExpr>>() {
-        any.downcast_ref::<Arc<dyn PhysicalExpr>>()
-            .unwrap()
-            .as_any()
-    } else if any.is::<Box<dyn PhysicalExpr>>() {
-        any.downcast_ref::<Box<dyn PhysicalExpr>>()
-            .unwrap()
-            .as_any()
-    } else {
-        any
-    }
-}
-
 /// This function is similar to the `contains` method of `Vec`. It finds
 /// whether `expr` is among `physical_exprs`.
 pub fn physical_exprs_contains(
diff --git a/datafusion/physical-expr/src/planner.rs 
b/datafusion/physical-expr/src/planner.rs
index 0dbea09ffb..44c9d33d60 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -33,6 +33,67 @@ use datafusion_expr::{
 };
 use std::sync::Arc;
 
+/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1
+/// AS int)`.
+///
+/// [PhysicalExpr] are the physical counterpart to [Expr] used in logical
+/// planning, and can be evaluated directly on a [RecordBatch]. They are
+/// normally created from [Expr] by a [PhysicalPlanner] and can be created
+/// directly using [create_physical_expr].
+///
+/// A Physical expression knows its type, nullability and how to evaluate 
itself.
+///
+/// [PhysicalPlanner]: 
https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html
+/// [RecordBatch]: 
https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
+///
+/// # Example: Create `PhysicalExpr` from `Expr`
+/// ```
+/// # use arrow::datatypes::{DataType, Field, Schema};
+/// # use datafusion_common::DFSchema;
+/// # use datafusion_expr::{Expr, col, lit};
+/// # use datafusion_physical_expr::create_physical_expr;
+/// # use datafusion_expr::execution_props::ExecutionProps;
+/// // For a logical expression `a = 1`, we can create a physical expression
+/// let expr = col("a").eq(lit(1));
+/// // To create a PhysicalExpr we need 1. a schema
+/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+/// let df_schema = DFSchema::try_from(schema).unwrap();
+/// // 2. ExecutionProps
+/// let props = ExecutionProps::new();
+/// // We can now create a PhysicalExpr:
+/// let physical_expr = create_physical_expr(&expr, &df_schema, 
&props).unwrap();
+/// ```
+///
+/// # Example: Executing a PhysicalExpr to obtain [ColumnarValue]
+/// ```
+/// # use std::sync::Arc;
+/// # use arrow::array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch};
+/// # use arrow::datatypes::{DataType, Field, Schema};
+/// # use datafusion_common::{assert_batches_eq, DFSchema};
+/// # use datafusion_expr::{Expr, col, lit, ColumnarValue};
+/// # use datafusion_physical_expr::create_physical_expr;
+/// # use datafusion_expr::execution_props::ExecutionProps;
+/// # let expr = col("a").eq(lit(1));
+/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap();
+/// # let props = ExecutionProps::new();
+/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a 
RecordBatch like this:
+/// let physical_expr = create_physical_expr(&expr, &df_schema, 
&props).unwrap();
+/// // Input of [1,2,3]
+/// let input_batch = RecordBatch::try_from_iter(vec![
+///   ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _)
+/// ]).unwrap();
+/// // The result is a ColumnarValue (either an Array or a Scalar)
+/// let result = physical_expr.evaluate(&input_batch).unwrap();
+/// // In this case, a BooleanArray with the result of the comparison
+/// let ColumnarValue::Array(arr) = result else {
+///  panic!("Expected an array")
+/// };
+/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false, 
false]));
+/// ```
+///
+/// [ColumnarValue]: datafusion_expr::ColumnarValue
+///
 /// Create a physical expression from a logical expression ([Expr]).
 ///
 /// # Arguments
diff --git a/datafusion/physical-expr/src/utils/mod.rs 
b/datafusion/physical-expr/src/utils/mod.rs
index b8e99403d6..e55bc3d156 100644
--- a/datafusion/physical-expr/src/utils/mod.rs
+++ b/datafusion/physical-expr/src/utils/mod.rs
@@ -24,10 +24,9 @@ use std::sync::Arc;
 
 use crate::expressions::{BinaryExpr, Column};
 use crate::tree_node::ExprContext;
-use crate::{PhysicalExpr, PhysicalSortExpr};
+use crate::PhysicalExpr;
+use crate::PhysicalSortExpr;
 
-use arrow::array::{make_array, Array, ArrayRef, BooleanArray, 
MutableArrayData};
-use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
 use arrow::datatypes::SchemaRef;
 use datafusion_common::tree_node::{
     Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
@@ -244,62 +243,6 @@ pub fn reassign_predicate_columns(
     .data()
 }
 
-/// Reverses the ORDER BY expression, which is useful during equivalent window
-/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns 
into
-/// 'ORDER BY a DESC, NULLS FIRST'.
-pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> 
Vec<PhysicalSortExpr> {
-    order_bys
-        .iter()
-        .map(|e| PhysicalSortExpr {
-            expr: e.expr.clone(),
-            options: !e.options,
-        })
-        .collect()
-}
-
-/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, 
next values of `truthy`
-/// are taken, when the mask evaluates `false` values null values are filled.
-///
-/// # Arguments
-/// * `mask` - Boolean values used to determine where to put the `truthy` 
values
-/// * `truthy` - All values of this array are to scatter according to `mask` 
into final result.
-pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
-    let truthy = truthy.to_data();
-
-    // update the mask so that any null values become false
-    // (SlicesIterator doesn't respect nulls)
-    let mask = and_kleene(mask, &is_not_null(mask)?)?;
-
-    let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
-
-    // the SlicesIterator slices only the true values. So the gaps left by 
this iterator we need to
-    // fill with falsy values
-
-    // keep track of how much is filled
-    let mut filled = 0;
-    // keep track of current position we have in truthy array
-    let mut true_pos = 0;
-
-    SlicesIterator::new(&mask).for_each(|(start, end)| {
-        // the gap needs to be filled with nulls
-        if start > filled {
-            mutable.extend_nulls(start - filled);
-        }
-        // fill with truthy values
-        let len = end - start;
-        mutable.extend(0, true_pos, true_pos + len);
-        true_pos += len;
-        filled = end;
-    });
-    // the remaining part is falsy
-    if filled < mask.len() {
-        mutable.extend_nulls(mask.len() - filled);
-    }
-
-    let data = mutable.freeze();
-    Ok(make_array(data))
-}
-
 /// Merge left and right sort expressions, checking for duplicates.
 pub fn merge_vectors(
     left: &[PhysicalSortExpr],
@@ -321,9 +264,7 @@ mod tests {
     use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal};
     use crate::PhysicalSortExpr;
 
-    use arrow_array::Int32Array;
     use arrow_schema::{DataType, Field, Schema};
-    use datafusion_common::cast::{as_boolean_array, as_int32_array};
     use datafusion_common::{Result, ScalarValue};
 
     use petgraph::visit::Bfs;
@@ -517,70 +458,4 @@ mod tests {
         assert_eq!(collect_columns(&expr3), expected);
         Ok(())
     }
-
-    #[test]
-    fn scatter_int() -> Result<()> {
-        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
-        let mask = BooleanArray::from(vec![true, true, false, false, true]);
-
-        // the output array is expected to be the same length as the mask array
-        let expected =
-            Int32Array::from_iter(vec![Some(1), Some(10), None, None, 
Some(11)]);
-        let result = scatter(&mask, truthy.as_ref())?;
-        let result = as_int32_array(&result)?;
-
-        assert_eq!(&expected, result);
-        Ok(())
-    }
-
-    #[test]
-    fn scatter_int_end_with_false() -> Result<()> {
-        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
-        let mask = BooleanArray::from(vec![true, false, true, false, false, 
false]);
-
-        // output should be same length as mask
-        let expected =
-            Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, 
None]);
-        let result = scatter(&mask, truthy.as_ref())?;
-        let result = as_int32_array(&result)?;
-
-        assert_eq!(&expected, result);
-        Ok(())
-    }
-
-    #[test]
-    fn scatter_with_null_mask() -> Result<()> {
-        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
-        let mask: BooleanArray = vec![Some(false), None, Some(true), 
Some(true), None]
-            .into_iter()
-            .collect();
-
-        // output should treat nulls as though they are false
-        let expected = Int32Array::from_iter(vec![None, None, Some(1), 
Some(10), None]);
-        let result = scatter(&mask, truthy.as_ref())?;
-        let result = as_int32_array(&result)?;
-
-        assert_eq!(&expected, result);
-        Ok(())
-    }
-
-    #[test]
-    fn scatter_boolean() -> Result<()> {
-        let truthy = Arc::new(BooleanArray::from(vec![false, false, false, 
true]));
-        let mask = BooleanArray::from(vec![true, true, false, false, true]);
-
-        // the output array is expected to be the same length as the mask array
-        let expected = BooleanArray::from_iter(vec![
-            Some(false),
-            Some(false),
-            None,
-            None,
-            Some(false),
-        ]);
-        let result = scatter(&mask, truthy.as_ref())?;
-        let result = as_boolean_array(&result)?;
-
-        assert_eq!(&expected, result);
-        Ok(())
-    }
 }

Reply via email to