This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 11f164c5f3 Move some datafusion-optimizer::utils down to 
datafusion-expr::utils (#8354)
11f164c5f3 is described below

commit 11f164c5f3568c3d4fa796bb63a7db36a7b1e821
Author: Jesse <[email protected]>
AuthorDate: Wed Nov 29 19:07:26 2023 +0100

    Move some datafusion-optimizer::utils down to datafusion-expr::utils (#8354)
    
    These utils manipulate `LogicalPlan`s and `Expr`s and may be useful in
    projects that only depend on `datafusion-expr`
---
 benchmarks/src/parquet_filter.rs                   |   2 +-
 datafusion/core/src/datasource/listing/table.rs    |   4 +-
 datafusion/core/tests/parquet/filter_pushdown.rs   |   2 +-
 datafusion/expr/src/utils.rs                       | 381 +++++++++++++++++++-
 datafusion/optimizer/src/analyzer/subquery.rs      |   3 +-
 datafusion/optimizer/src/analyzer/type_coercion.rs |   7 +-
 datafusion/optimizer/src/decorrelate.rs            |   5 +-
 .../src/decorrelate_predicate_subquery.rs          |   3 +-
 .../optimizer/src/extract_equijoin_predicate.rs    |   3 +-
 datafusion/optimizer/src/optimizer.rs              |   2 +-
 datafusion/optimizer/src/push_down_filter.rs       |  16 +-
 .../optimizer/src/scalar_subquery_to_join.rs       |   3 +-
 .../src/simplify_expressions/simplify_exprs.rs     |   2 +-
 .../optimizer/src/unwrap_cast_in_comparison.rs     |   2 +-
 datafusion/optimizer/src/utils.rs                  | 399 +++++----------------
 datafusion/substrait/src/logical_plan/consumer.rs  |   2 +-
 16 files changed, 499 insertions(+), 337 deletions(-)

diff --git a/benchmarks/src/parquet_filter.rs b/benchmarks/src/parquet_filter.rs
index e19596b80f..1d816908e2 100644
--- a/benchmarks/src/parquet_filter.rs
+++ b/benchmarks/src/parquet_filter.rs
@@ -19,8 +19,8 @@ use crate::AccessLogOpt;
 use crate::{BenchmarkRun, CommonOpt};
 use arrow::util::pretty;
 use datafusion::common::Result;
+use datafusion::logical_expr::utils::disjunction;
 use datafusion::logical_expr::{lit, or, Expr};
-use datafusion::optimizer::utils::disjunction;
 use datafusion::physical_plan::collect;
 use datafusion::prelude::{col, SessionContext};
 use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 515bc8a9e6..a3be57db3a 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -40,11 +40,10 @@ use crate::datasource::{
     physical_plan::{is_plan_streaming, FileScanConfig, FileSinkConfig},
     TableProvider, TableType,
 };
-use crate::logical_expr::TableProviderFilterPushDown;
 use crate::{
     error::{DataFusionError, Result},
     execution::context::SessionState,
-    logical_expr::Expr,
+    logical_expr::{utils::conjunction, Expr, TableProviderFilterPushDown},
     physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
 };
 
@@ -56,7 +55,6 @@ use datafusion_common::{
 };
 use datafusion_execution::cache::cache_manager::FileStatisticsCache;
 use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
-use datafusion_optimizer::utils::conjunction;
 use datafusion_physical_expr::{
     create_physical_expr, LexOrdering, PhysicalSortRequirement,
 };
diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs 
b/datafusion/core/tests/parquet/filter_pushdown.rs
index 61a8f87b9e..f214e8903a 100644
--- a/datafusion/core/tests/parquet/filter_pushdown.rs
+++ b/datafusion/core/tests/parquet/filter_pushdown.rs
@@ -34,7 +34,7 @@ use datafusion::physical_plan::collect;
 use datafusion::physical_plan::metrics::MetricsSet;
 use datafusion::prelude::{col, lit, lit_timestamp_nano, Expr, SessionContext};
 use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
-use datafusion_optimizer::utils::{conjunction, disjunction, split_conjunction};
+use datafusion_expr::utils::{conjunction, disjunction, split_conjunction};
 use itertools::Itertools;
 use parquet::file::properties::WriterProperties;
 use tempfile::TempDir;
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index d8668fba8e..7deb13c89b 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -18,9 +18,13 @@
 //! Expression utilities
 
 use crate::expr::{Alias, Sort, WindowFunction};
+use crate::expr_rewriter::strip_outer_reference;
 use crate::logical_plan::Aggregate;
 use crate::signature::{Signature, TypeSignature};
-use crate::{Cast, Expr, ExprSchemable, GroupingSet, LogicalPlan, TryCast};
+use crate::{
+    and, BinaryExpr, Cast, Expr, ExprSchemable, Filter, GroupingSet, 
LogicalPlan,
+    Operator, TryCast,
+};
 use arrow::datatypes::{DataType, TimeUnit};
 use datafusion_common::tree_node::{TreeNode, VisitRecursion};
 use datafusion_common::{
@@ -30,6 +34,7 @@ use datafusion_common::{
 use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem, 
WildcardAdditionalOptions};
 use std::cmp::Ordering;
 use std::collections::HashSet;
+use std::sync::Arc;
 
 ///  The value to which `COUNT(*)` is expanded to in
 ///  `COUNT(<constant>)` expressions
@@ -1004,12 +1009,245 @@ pub fn generate_signature_error_msg(
         )
 }
 
+/// Splits a conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]`
+///
+/// See [`split_conjunction_owned`] for more details and an example.
+pub fn split_conjunction(expr: &Expr) -> Vec<&Expr> {
+    split_conjunction_impl(expr, vec![])
+}
+
+fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) -> 
Vec<&'a Expr> {
+    match expr {
+        Expr::BinaryExpr(BinaryExpr {
+            right,
+            op: Operator::And,
+            left,
+        }) => {
+            let exprs = split_conjunction_impl(left, exprs);
+            split_conjunction_impl(right, exprs)
+        }
+        Expr::Alias(Alias { expr, .. }) => split_conjunction_impl(expr, exprs),
+        other => {
+            exprs.push(other);
+            exprs
+        }
+    }
+}
+
+/// Splits an owned conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]`
+///
+/// This is often used to "split" filter expressions such as `col1 = 5
+/// AND col2 = 10` into [`col1 = 5`, `col2 = 10`];
+///
+/// # Example
+/// ```
+/// # use datafusion_expr::{col, lit};
+/// # use datafusion_expr::utils::split_conjunction_owned;
+/// // a=1 AND b=2
+/// let expr = col("a").eq(lit(1)).and(col("b").eq(lit(2)));
+///
+/// // [a=1, b=2]
+/// let split = vec![
+///   col("a").eq(lit(1)),
+///   col("b").eq(lit(2)),
+/// ];
+///
+/// // use split_conjunction_owned to split them
+/// assert_eq!(split_conjunction_owned(expr), split);
+/// ```
+pub fn split_conjunction_owned(expr: Expr) -> Vec<Expr> {
+    split_binary_owned(expr, Operator::And)
+}
+
+/// Splits an owned binary operator tree [`Expr`] such as `A <OP> B <OP> C` => 
`[A, B, C]`
+///
+/// This is often used to "split" expressions such as `col1 = 5
+/// AND col2 = 10` into [`col1 = 5`, `col2 = 10`];
+///
+/// # Example
+/// ```
+/// # use datafusion_expr::{col, lit, Operator};
+/// # use datafusion_expr::utils::split_binary_owned;
+/// # use std::ops::Add;
+/// // a=1 + b=2
+/// let expr = col("a").eq(lit(1)).add(col("b").eq(lit(2)));
+///
+/// // [a=1, b=2]
+/// let split = vec![
+///   col("a").eq(lit(1)),
+///   col("b").eq(lit(2)),
+/// ];
+///
+/// // use split_binary_owned to split them
+/// assert_eq!(split_binary_owned(expr, Operator::Plus), split);
+/// ```
+pub fn split_binary_owned(expr: Expr, op: Operator) -> Vec<Expr> {
+    split_binary_owned_impl(expr, op, vec![])
+}
+
+fn split_binary_owned_impl(
+    expr: Expr,
+    operator: Operator,
+    mut exprs: Vec<Expr>,
+) -> Vec<Expr> {
+    match expr {
+        Expr::BinaryExpr(BinaryExpr { right, op, left }) if op == operator => {
+            let exprs = split_binary_owned_impl(*left, operator, exprs);
+            split_binary_owned_impl(*right, operator, exprs)
+        }
+        Expr::Alias(Alias { expr, .. }) => {
+            split_binary_owned_impl(*expr, operator, exprs)
+        }
+        other => {
+            exprs.push(other);
+            exprs
+        }
+    }
+}
+
+/// Splits an binary operator tree [`Expr`] such as `A <OP> B <OP> C` => `[A, 
B, C]`
+///
+/// See [`split_binary_owned`] for more details and an example.
+pub fn split_binary(expr: &Expr, op: Operator) -> Vec<&Expr> {
+    split_binary_impl(expr, op, vec![])
+}
+
+fn split_binary_impl<'a>(
+    expr: &'a Expr,
+    operator: Operator,
+    mut exprs: Vec<&'a Expr>,
+) -> Vec<&'a Expr> {
+    match expr {
+        Expr::BinaryExpr(BinaryExpr { right, op, left }) if *op == operator => 
{
+            let exprs = split_binary_impl(left, operator, exprs);
+            split_binary_impl(right, operator, exprs)
+        }
+        Expr::Alias(Alias { expr, .. }) => split_binary_impl(expr, operator, 
exprs),
+        other => {
+            exprs.push(other);
+            exprs
+        }
+    }
+}
+
+/// Combines an array of filter expressions into a single filter
+/// expression consisting of the input filter expressions joined with
+/// logical AND.
+///
+/// Returns None if the filters array is empty.
+///
+/// # Example
+/// ```
+/// # use datafusion_expr::{col, lit};
+/// # use datafusion_expr::utils::conjunction;
+/// // a=1 AND b=2
+/// let expr = col("a").eq(lit(1)).and(col("b").eq(lit(2)));
+///
+/// // [a=1, b=2]
+/// let split = vec![
+///   col("a").eq(lit(1)),
+///   col("b").eq(lit(2)),
+/// ];
+///
+/// // use conjunction to join them together with `AND`
+/// assert_eq!(conjunction(split), Some(expr));
+/// ```
+pub fn conjunction(filters: impl IntoIterator<Item = Expr>) -> Option<Expr> {
+    filters.into_iter().reduce(|accum, expr| accum.and(expr))
+}
+
+/// Combines an array of filter expressions into a single filter
+/// expression consisting of the input filter expressions joined with
+/// logical OR.
+///
+/// Returns None if the filters array is empty.
+pub fn disjunction(filters: impl IntoIterator<Item = Expr>) -> Option<Expr> {
+    filters.into_iter().reduce(|accum, expr| accum.or(expr))
+}
+
+/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] 
with
+/// its predicate be all `predicates` ANDed.
+pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> 
Result<LogicalPlan> {
+    // reduce filters to a single filter with an AND
+    let predicate = predicates
+        .iter()
+        .skip(1)
+        .fold(predicates[0].clone(), |acc, predicate| {
+            and(acc, (*predicate).to_owned())
+        });
+
+    Ok(LogicalPlan::Filter(Filter::try_new(
+        predicate,
+        Arc::new(plan),
+    )?))
+}
+
+/// Looks for correlating expressions: for example, a binary expression with 
one field from the subquery, and
+/// one not in the subquery (closed upon from outer scope)
+///
+/// # Arguments
+///
+/// * `exprs` - List of expressions that may or may not be joins
+///
+/// # Return value
+///
+/// Tuple of (expressions containing joins, remaining non-join expressions)
+pub fn find_join_exprs(exprs: Vec<&Expr>) -> Result<(Vec<Expr>, Vec<Expr>)> {
+    let mut joins = vec![];
+    let mut others = vec![];
+    for filter in exprs.into_iter() {
+        // If the expression contains correlated predicates, add it to join 
filters
+        if filter.contains_outer() {
+            if !matches!(filter, Expr::BinaryExpr(BinaryExpr{ left, op: 
Operator::Eq, right }) if left.eq(right))
+            {
+                joins.push(strip_outer_reference((*filter).clone()));
+            }
+        } else {
+            others.push((*filter).clone());
+        }
+    }
+
+    Ok((joins, others))
+}
+
+/// Returns the first (and only) element in a slice, or an error
+///
+/// # Arguments
+///
+/// * `slice` - The slice to extract from
+///
+/// # Return value
+///
+/// The first element, or an error
+pub fn only_or_err<T>(slice: &[T]) -> Result<&T> {
+    match slice {
+        [it] => Ok(it),
+        [] => plan_err!("No items found!"),
+        _ => plan_err!("More than one item found!"),
+    }
+}
+
+/// merge inputs schema into a single schema.
+pub fn merge_schema(inputs: Vec<&LogicalPlan>) -> DFSchema {
+    if inputs.len() == 1 {
+        inputs[0].schema().clone().as_ref().clone()
+    } else {
+        inputs.iter().map(|input| input.schema()).fold(
+            DFSchema::empty(),
+            |mut lhs, rhs| {
+                lhs.merge(rhs);
+                lhs
+            },
+        )
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
     use crate::expr_vec_fmt;
     use crate::{
-        col, cube, expr, grouping_set, rollup, AggregateFunction, WindowFrame,
+        col, cube, expr, grouping_set, lit, rollup, AggregateFunction, 
WindowFrame,
         WindowFunction,
     };
 
@@ -1322,4 +1560,143 @@ mod tests {
 
         Ok(())
     }
+    #[test]
+    fn test_split_conjunction() {
+        let expr = col("a");
+        let result = split_conjunction(&expr);
+        assert_eq!(result, vec![&expr]);
+    }
+
+    #[test]
+    fn test_split_conjunction_two() {
+        let expr = col("a").eq(lit(5)).and(col("b"));
+        let expr1 = col("a").eq(lit(5));
+        let expr2 = col("b");
+
+        let result = split_conjunction(&expr);
+        assert_eq!(result, vec![&expr1, &expr2]);
+    }
+
+    #[test]
+    fn test_split_conjunction_alias() {
+        let expr = col("a").eq(lit(5)).and(col("b").alias("the_alias"));
+        let expr1 = col("a").eq(lit(5));
+        let expr2 = col("b"); // has no alias
+
+        let result = split_conjunction(&expr);
+        assert_eq!(result, vec![&expr1, &expr2]);
+    }
+
+    #[test]
+    fn test_split_conjunction_or() {
+        let expr = col("a").eq(lit(5)).or(col("b"));
+        let result = split_conjunction(&expr);
+        assert_eq!(result, vec![&expr]);
+    }
+
+    #[test]
+    fn test_split_binary_owned() {
+        let expr = col("a");
+        assert_eq!(split_binary_owned(expr.clone(), Operator::And), 
vec![expr]);
+    }
+
+    #[test]
+    fn test_split_binary_owned_two() {
+        assert_eq!(
+            split_binary_owned(col("a").eq(lit(5)).and(col("b")), 
Operator::And),
+            vec![col("a").eq(lit(5)), col("b")]
+        );
+    }
+
+    #[test]
+    fn test_split_binary_owned_different_op() {
+        let expr = col("a").eq(lit(5)).or(col("b"));
+        assert_eq!(
+            // expr is connected by OR, but pass in AND
+            split_binary_owned(expr.clone(), Operator::And),
+            vec![expr]
+        );
+    }
+
+    #[test]
+    fn test_split_conjunction_owned() {
+        let expr = col("a");
+        assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]);
+    }
+
+    #[test]
+    fn test_split_conjunction_owned_two() {
+        assert_eq!(
+            split_conjunction_owned(col("a").eq(lit(5)).and(col("b"))),
+            vec![col("a").eq(lit(5)), col("b")]
+        );
+    }
+
+    #[test]
+    fn test_split_conjunction_owned_alias() {
+        assert_eq!(
+            
split_conjunction_owned(col("a").eq(lit(5)).and(col("b").alias("the_alias"))),
+            vec![
+                col("a").eq(lit(5)),
+                // no alias on b
+                col("b"),
+            ]
+        );
+    }
+
+    #[test]
+    fn test_conjunction_empty() {
+        assert_eq!(conjunction(vec![]), None);
+    }
+
+    #[test]
+    fn test_conjunction() {
+        // `[A, B, C]`
+        let expr = conjunction(vec![col("a"), col("b"), col("c")]);
+
+        // --> `(A AND B) AND C`
+        assert_eq!(expr, Some(col("a").and(col("b")).and(col("c"))));
+
+        // which is different than `A AND (B AND C)`
+        assert_ne!(expr, Some(col("a").and(col("b").and(col("c")))));
+    }
+
+    #[test]
+    fn test_disjunction_empty() {
+        assert_eq!(disjunction(vec![]), None);
+    }
+
+    #[test]
+    fn test_disjunction() {
+        // `[A, B, C]`
+        let expr = disjunction(vec![col("a"), col("b"), col("c")]);
+
+        // --> `(A OR B) OR C`
+        assert_eq!(expr, Some(col("a").or(col("b")).or(col("c"))));
+
+        // which is different than `A OR (B OR C)`
+        assert_ne!(expr, Some(col("a").or(col("b").or(col("c")))));
+    }
+
+    #[test]
+    fn test_split_conjunction_owned_or() {
+        let expr = col("a").eq(lit(5)).or(col("b"));
+        assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]);
+    }
+
+    #[test]
+    fn test_collect_expr() -> Result<()> {
+        let mut accum: HashSet<Column> = HashSet::new();
+        expr_to_columns(
+            &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)),
+            &mut accum,
+        )?;
+        expr_to_columns(
+            &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)),
+            &mut accum,
+        )?;
+        assert_eq!(1, accum.len());
+        assert!(accum.contains(&Column::from_name("a")));
+        Ok(())
+    }
 }
diff --git a/datafusion/optimizer/src/analyzer/subquery.rs 
b/datafusion/optimizer/src/analyzer/subquery.rs
index 6b8b1020cd..7c5b70b19a 100644
--- a/datafusion/optimizer/src/analyzer/subquery.rs
+++ b/datafusion/optimizer/src/analyzer/subquery.rs
@@ -16,10 +16,11 @@
 // under the License.
 
 use crate::analyzer::check_plan;
-use crate::utils::{collect_subquery_cols, split_conjunction};
+use crate::utils::collect_subquery_cols;
 use datafusion_common::tree_node::{TreeNode, VisitRecursion};
 use datafusion_common::{plan_err, DataFusionError, Result};
 use datafusion_expr::expr_rewriter::strip_outer_reference;
+use datafusion_expr::utils::split_conjunction;
 use datafusion_expr::{
     Aggregate, BinaryExpr, Cast, Expr, Filter, Join, JoinType, LogicalPlan, 
Operator,
     Window,
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 6628e8961e..eb5d8c53a5 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -42,16 +42,15 @@ use datafusion_expr::type_coercion::other::{
     get_coerce_type_for_case_expression, get_coerce_type_for_list,
 };
 use datafusion_expr::type_coercion::{is_datetime, is_utf8_or_large_utf8};
+use datafusion_expr::utils::merge_schema;
 use datafusion_expr::{
     is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown,
     type_coercion, window_function, AggregateFunction, BuiltinScalarFunction, 
Expr,
-    LogicalPlan, Operator, Projection, ScalarFunctionDefinition, WindowFrame,
-    WindowFrameBound, WindowFrameUnits,
+    ExprSchemable, LogicalPlan, Operator, Projection, ScalarFunctionDefinition,
+    Signature, WindowFrame, WindowFrameBound, WindowFrameUnits,
 };
-use datafusion_expr::{ExprSchemable, Signature};
 
 use crate::analyzer::AnalyzerRule;
-use crate::utils::merge_schema;
 
 #[derive(Default)]
 pub struct TypeCoercion {}
diff --git a/datafusion/optimizer/src/decorrelate.rs 
b/datafusion/optimizer/src/decorrelate.rs
index c8162683f3..ed6f472186 100644
--- a/datafusion/optimizer/src/decorrelate.rs
+++ b/datafusion/optimizer/src/decorrelate.rs
@@ -16,15 +16,14 @@
 // under the License.
 
 use crate::simplify_expressions::{ExprSimplifier, SimplifyContext};
-use crate::utils::{
-    collect_subquery_cols, conjunction, find_join_exprs, split_conjunction,
-};
+use crate::utils::collect_subquery_cols;
 use datafusion_common::tree_node::{
     RewriteRecursion, Transformed, TreeNode, TreeNodeRewriter,
 };
 use datafusion_common::{plan_err, Result};
 use datafusion_common::{Column, DFSchemaRef, DataFusionError, ScalarValue};
 use datafusion_expr::expr::Alias;
+use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction};
 use datafusion_expr::{expr, EmptyRelation, Expr, LogicalPlan, 
LogicalPlanBuilder};
 use datafusion_physical_expr::execution_props::ExecutionProps;
 use std::collections::{BTreeSet, HashMap};
diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs 
b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
index 96b46663d8..450336376a 100644
--- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
+++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
@@ -17,7 +17,7 @@
 
 use crate::decorrelate::PullUpCorrelatedExpr;
 use crate::optimizer::ApplyOrder;
-use crate::utils::{conjunction, replace_qualified_name, split_conjunction};
+use crate::utils::replace_qualified_name;
 use crate::{OptimizerConfig, OptimizerRule};
 use datafusion_common::alias::AliasGenerator;
 use datafusion_common::tree_node::TreeNode;
@@ -25,6 +25,7 @@ use datafusion_common::{plan_err, DataFusionError, Result};
 use datafusion_expr::expr::{Exists, InSubquery};
 use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
 use datafusion_expr::logical_plan::{JoinType, Subquery};
+use datafusion_expr::utils::{conjunction, split_conjunction};
 use datafusion_expr::{
     exists, in_subquery, not_exists, not_in_subquery, BinaryExpr, Expr, Filter,
     LogicalPlan, LogicalPlanBuilder, Operator,
diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs 
b/datafusion/optimizer/src/extract_equijoin_predicate.rs
index 575969fbf7..24664d57c3 100644
--- a/datafusion/optimizer/src/extract_equijoin_predicate.rs
+++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs
@@ -17,11 +17,10 @@
 
 //! [`ExtractEquijoinPredicate`] rule that extracts equijoin predicates
 use crate::optimizer::ApplyOrder;
-use crate::utils::split_conjunction;
 use crate::{OptimizerConfig, OptimizerRule};
 use datafusion_common::DFSchema;
 use datafusion_common::Result;
-use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
+use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair, 
split_conjunction};
 use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan, 
Operator};
 use std::sync::Arc;
 
diff --git a/datafusion/optimizer/src/optimizer.rs 
b/datafusion/optimizer/src/optimizer.rs
index c7ad31f39b..7af46ed70a 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -45,7 +45,7 @@ use chrono::{DateTime, Utc};
 use datafusion_common::alias::AliasGenerator;
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::{DataFusionError, Result};
-use datafusion_expr::logical_plan::LogicalPlan;
+use datafusion_expr::LogicalPlan;
 use log::{debug, warn};
 use std::collections::HashSet;
 use std::sync::Arc;
diff --git a/datafusion/optimizer/src/push_down_filter.rs 
b/datafusion/optimizer/src/push_down_filter.rs
index 7a2c6a8d8c..95eeee931b 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -16,13 +16,13 @@
 //! the plan.
 
 use crate::optimizer::ApplyOrder;
-use crate::utils::{conjunction, split_conjunction, split_conjunction_owned};
-use crate::{utils, OptimizerConfig, OptimizerRule};
+use crate::{OptimizerConfig, OptimizerRule};
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
 use datafusion_common::{
     internal_err, plan_datafusion_err, Column, DFSchema, DataFusionError, 
Result,
 };
 use datafusion_expr::expr::Alias;
+use datafusion_expr::utils::{conjunction, split_conjunction, 
split_conjunction_owned};
 use datafusion_expr::Volatility;
 use datafusion_expr::{
     and,
@@ -546,9 +546,7 @@ fn push_down_join(
     parent_predicate: Option<&Expr>,
 ) -> Result<Option<LogicalPlan>> {
     let predicates = match parent_predicate {
-        Some(parent_predicate) => {
-            utils::split_conjunction_owned(parent_predicate.clone())
-        }
+        Some(parent_predicate) => 
split_conjunction_owned(parent_predicate.clone()),
         None => vec![],
     };
 
@@ -556,7 +554,7 @@ fn push_down_join(
     let on_filters = join
         .filter
         .as_ref()
-        .map(|e| utils::split_conjunction_owned(e.clone()))
+        .map(|e| split_conjunction_owned(e.clone()))
         .unwrap_or_default();
 
     let mut is_inner_join = false;
@@ -805,7 +803,7 @@ impl OptimizerRule for PushDownFilter {
                     .map(|e| 
Ok(Column::from_qualified_name(e.display_name()?)))
                     .collect::<Result<HashSet<_>>>()?;
 
-                let predicates = 
utils::split_conjunction_owned(filter.predicate.clone());
+                let predicates = 
split_conjunction_owned(filter.predicate.clone());
 
                 let mut keep_predicates = vec![];
                 let mut push_predicates = vec![];
@@ -853,7 +851,7 @@ impl OptimizerRule for PushDownFilter {
                 }
             }
             LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
-                let predicates = 
utils::split_conjunction_owned(filter.predicate.clone());
+                let predicates = 
split_conjunction_owned(filter.predicate.clone());
                 push_down_all_join(
                     predicates,
                     vec![],
@@ -908,7 +906,7 @@ impl OptimizerRule for PushDownFilter {
                 let prevent_cols =
                     extension_plan.node.prevent_predicate_push_down_columns();
 
-                let predicates = 
utils::split_conjunction_owned(filter.predicate.clone());
+                let predicates = 
split_conjunction_owned(filter.predicate.clone());
 
                 let mut keep_predicates = vec![];
                 let mut push_predicates = vec![];
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs 
b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index 7ac0c25119..34ed4a9475 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -17,7 +17,7 @@
 
 use crate::decorrelate::{PullUpCorrelatedExpr, UN_MATCHED_ROW_INDICATOR};
 use crate::optimizer::ApplyOrder;
-use crate::utils::{conjunction, replace_qualified_name};
+use crate::utils::replace_qualified_name;
 use crate::{OptimizerConfig, OptimizerRule};
 use datafusion_common::alias::AliasGenerator;
 use datafusion_common::tree_node::{
@@ -26,6 +26,7 @@ use datafusion_common::tree_node::{
 use datafusion_common::{plan_err, Column, DataFusionError, Result, 
ScalarValue};
 use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
 use datafusion_expr::logical_plan::{JoinType, Subquery};
+use datafusion_expr::utils::conjunction;
 use datafusion_expr::{expr, EmptyRelation, Expr, LogicalPlan, 
LogicalPlanBuilder};
 use std::collections::{BTreeSet, HashMap};
 use std::sync::Arc;
diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs 
b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
index 9dc83e0fad..43a41b1185 100644
--- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
+++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
@@ -20,10 +20,10 @@
 use std::sync::Arc;
 
 use super::{ExprSimplifier, SimplifyContext};
-use crate::utils::merge_schema;
 use crate::{OptimizerConfig, OptimizerRule};
 use datafusion_common::{DFSchema, DFSchemaRef, Result};
 use datafusion_expr::logical_plan::LogicalPlan;
+use datafusion_expr::utils::merge_schema;
 use datafusion_physical_expr::execution_props::ExecutionProps;
 
 /// Optimizer Pass that simplifies [`LogicalPlan`]s by rewriting
diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs 
b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
index 907c12b7af..91603e82a5 100644
--- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
+++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
@@ -19,7 +19,6 @@
 //! of expr can be added if needed.
 //! This rule can reduce adding the `Expr::Cast` the expr instead of adding 
the `Expr::Cast` to literal expr.
 use crate::optimizer::ApplyOrder;
-use crate::utils::merge_schema;
 use crate::{OptimizerConfig, OptimizerRule};
 use arrow::datatypes::{
     DataType, TimeUnit, MAX_DECIMAL_FOR_EACH_PRECISION, 
MIN_DECIMAL_FOR_EACH_PRECISION,
@@ -31,6 +30,7 @@ use datafusion_common::{
 };
 use datafusion_expr::expr::{BinaryExpr, Cast, InList, TryCast};
 use datafusion_expr::expr_rewriter::rewrite_preserving_name;
+use datafusion_expr::utils::merge_schema;
 use datafusion_expr::{
     binary_expr, in_list, lit, Expr, ExprSchemable, LogicalPlan, Operator,
 };
diff --git a/datafusion/optimizer/src/utils.rs 
b/datafusion/optimizer/src/utils.rs
index a3e7e42875..48f72ee7a0 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -18,19 +18,13 @@
 //! Collection of utility functions that are leveraged by the query optimizer 
rules
 
 use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::DataFusionError;
-use datafusion_common::{plan_err, Column, DFSchemaRef};
+use datafusion_common::{Column, DFSchemaRef};
 use datafusion_common::{DFSchema, Result};
-use datafusion_expr::expr::{Alias, BinaryExpr};
-use datafusion_expr::expr_rewriter::{replace_col, strip_outer_reference};
-use datafusion_expr::{
-    and,
-    logical_plan::{Filter, LogicalPlan},
-    Expr, Operator,
-};
+use datafusion_expr::expr_rewriter::replace_col;
+use datafusion_expr::utils as expr_utils;
+use datafusion_expr::{logical_plan::LogicalPlan, Expr, Operator};
 use log::{debug, trace};
 use std::collections::{BTreeSet, HashMap};
-use std::sync::Arc;
 
 /// Convenience rule for writing optimizers: recursively invoke
 /// optimize on plan's children and then return a node of the same
@@ -58,29 +52,55 @@ pub fn optimize_children(
     }
 }
 
+pub(crate) fn collect_subquery_cols(
+    exprs: &[Expr],
+    subquery_schema: DFSchemaRef,
+) -> Result<BTreeSet<Column>> {
+    exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
+        let mut using_cols: Vec<Column> = vec![];
+        for col in expr.to_columns()?.into_iter() {
+            if subquery_schema.has_column(&col) {
+                using_cols.push(col);
+            }
+        }
+
+        cols.extend(using_cols);
+        Result::<_>::Ok(cols)
+    })
+}
+
+pub(crate) fn replace_qualified_name(
+    expr: Expr,
+    cols: &BTreeSet<Column>,
+    subquery_alias: &str,
+) -> Result<Expr> {
+    let alias_cols: Vec<Column> = cols
+        .iter()
+        .map(|col| {
+            Column::from_qualified_name(format!("{}.{}", subquery_alias, 
col.name))
+        })
+        .collect();
+    let replace_map: HashMap<&Column, &Column> =
+        cols.iter().zip(alias_cols.iter()).collect();
+
+    replace_col(expr, &replace_map)
+}
+
+/// Log the plan in debug/tracing mode after some part of the optimizer runs
+pub fn log_plan(description: &str, plan: &LogicalPlan) {
+    debug!("{description}:\n{}\n", plan.display_indent());
+    trace!("{description}::\n{}\n", plan.display_indent_schema());
+}
+
 /// Splits a conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]`
 ///
 /// See [`split_conjunction_owned`] for more details and an example.
+#[deprecated(
+    since = "34.0.0",
+    note = "use `datafusion_expr::utils::split_conjunction` instead"
+)]
 pub fn split_conjunction(expr: &Expr) -> Vec<&Expr> {
-    split_conjunction_impl(expr, vec![])
-}
-
-fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) -> 
Vec<&'a Expr> {
-    match expr {
-        Expr::BinaryExpr(BinaryExpr {
-            right,
-            op: Operator::And,
-            left,
-        }) => {
-            let exprs = split_conjunction_impl(left, exprs);
-            split_conjunction_impl(right, exprs)
-        }
-        Expr::Alias(Alias { expr, .. }) => split_conjunction_impl(expr, exprs),
-        other => {
-            exprs.push(other);
-            exprs
-        }
-    }
+    expr_utils::split_conjunction(expr)
 }
 
 /// Splits an owned conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]`
@@ -104,8 +124,12 @@ fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: 
Vec<&'a Expr>) -> Vec<&
 /// // use split_conjunction_owned to split them
 /// assert_eq!(split_conjunction_owned(expr), split);
 /// ```
+#[deprecated(
+    since = "34.0.0",
+    note = "use `datafusion_expr::utils::split_conjunction_owned` instead"
+)]
 pub fn split_conjunction_owned(expr: Expr) -> Vec<Expr> {
-    split_binary_owned(expr, Operator::And)
+    expr_utils::split_conjunction_owned(expr)
 }
 
 /// Splits an owned binary operator tree [`Expr`] such as `A <OP> B <OP> C` => 
`[A, B, C]`
@@ -130,53 +154,23 @@ pub fn split_conjunction_owned(expr: Expr) -> Vec<Expr> {
 /// // use split_binary_owned to split them
 /// assert_eq!(split_binary_owned(expr, Operator::Plus), split);
 /// ```
+#[deprecated(
+    since = "34.0.0",
+    note = "use `datafusion_expr::utils::split_binary_owned` instead"
+)]
 pub fn split_binary_owned(expr: Expr, op: Operator) -> Vec<Expr> {
-    split_binary_owned_impl(expr, op, vec![])
-}
-
-fn split_binary_owned_impl(
-    expr: Expr,
-    operator: Operator,
-    mut exprs: Vec<Expr>,
-) -> Vec<Expr> {
-    match expr {
-        Expr::BinaryExpr(BinaryExpr { right, op, left }) if op == operator => {
-            let exprs = split_binary_owned_impl(*left, operator, exprs);
-            split_binary_owned_impl(*right, operator, exprs)
-        }
-        Expr::Alias(Alias { expr, .. }) => {
-            split_binary_owned_impl(*expr, operator, exprs)
-        }
-        other => {
-            exprs.push(other);
-            exprs
-        }
-    }
+    expr_utils::split_binary_owned(expr, op)
 }
 
 /// Splits an binary operator tree [`Expr`] such as `A <OP> B <OP> C` => `[A, 
B, C]`
 ///
 /// See [`split_binary_owned`] for more details and an example.
+#[deprecated(
+    since = "34.0.0",
+    note = "use `datafusion_expr::utils::split_binary` instead"
+)]
 pub fn split_binary(expr: &Expr, op: Operator) -> Vec<&Expr> {
-    split_binary_impl(expr, op, vec![])
-}
-
-fn split_binary_impl<'a>(
-    expr: &'a Expr,
-    operator: Operator,
-    mut exprs: Vec<&'a Expr>,
-) -> Vec<&'a Expr> {
-    match expr {
-        Expr::BinaryExpr(BinaryExpr { right, op, left }) if *op == operator => 
{
-            let exprs = split_binary_impl(left, operator, exprs);
-            split_binary_impl(right, operator, exprs)
-        }
-        Expr::Alias(Alias { expr, .. }) => split_binary_impl(expr, operator, 
exprs),
-        other => {
-            exprs.push(other);
-            exprs
-        }
-    }
+    expr_utils::split_binary(expr, op)
 }
 
 /// Combines an array of filter expressions into a single filter
@@ -201,8 +195,12 @@ fn split_binary_impl<'a>(
 /// // use conjunction to join them together with `AND`
 /// assert_eq!(conjunction(split), Some(expr));
 /// ```
+#[deprecated(
+    since = "34.0.0",
+    note = "use `datafusion_expr::utils::conjunction` instead"
+)]
 pub fn conjunction(filters: impl IntoIterator<Item = Expr>) -> Option<Expr> {
-    filters.into_iter().reduce(|accum, expr| accum.and(expr))
+    expr_utils::conjunction(filters)
 }
 
 /// Combines an array of filter expressions into a single filter
@@ -210,25 +208,22 @@ pub fn conjunction(filters: impl IntoIterator<Item = 
Expr>) -> Option<Expr> {
 /// logical OR.
 ///
 /// Returns None if the filters array is empty.
+#[deprecated(
+    since = "34.0.0",
+    note = "use `datafusion_expr::utils::disjunction` instead"
+)]
 pub fn disjunction(filters: impl IntoIterator<Item = Expr>) -> Option<Expr> {
-    filters.into_iter().reduce(|accum, expr| accum.or(expr))
+    expr_utils::disjunction(filters)
 }
 
 /// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] 
with
 /// its predicate be all `predicates` ANDed.
+#[deprecated(
+    since = "34.0.0",
+    note = "use `datafusion_expr::utils::add_filter` instead"
+)]
 pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> 
Result<LogicalPlan> {
-    // reduce filters to a single filter with an AND
-    let predicate = predicates
-        .iter()
-        .skip(1)
-        .fold(predicates[0].clone(), |acc, predicate| {
-            and(acc, (*predicate).to_owned())
-        });
-
-    Ok(LogicalPlan::Filter(Filter::try_new(
-        predicate,
-        Arc::new(plan),
-    )?))
+    expr_utils::add_filter(plan, predicates)
 }
 
 /// Looks for correlating expressions: for example, a binary expression with 
one field from the subquery, and
@@ -241,22 +236,12 @@ pub fn add_filter(plan: LogicalPlan, predicates: 
&[&Expr]) -> Result<LogicalPlan
 /// # Return value
 ///
 /// Tuple of (expressions containing joins, remaining non-join expressions)
+#[deprecated(
+    since = "34.0.0",
+    note = "use `datafusion_expr::utils::find_join_exprs` instead"
+)]
 pub fn find_join_exprs(exprs: Vec<&Expr>) -> Result<(Vec<Expr>, Vec<Expr>)> {
-    let mut joins = vec![];
-    let mut others = vec![];
-    for filter in exprs.into_iter() {
-        // If the expression contains correlated predicates, add it to join 
filters
-        if filter.contains_outer() {
-            if !matches!(filter, Expr::BinaryExpr(BinaryExpr{ left, op: 
Operator::Eq, right }) if left.eq(right))
-            {
-                joins.push(strip_outer_reference((*filter).clone()));
-            }
-        } else {
-            others.push((*filter).clone());
-        }
-    }
-
-    Ok((joins, others))
+    expr_utils::find_join_exprs(exprs)
 }
 
 /// Returns the first (and only) element in a slice, or an error
@@ -268,215 +253,19 @@ pub fn find_join_exprs(exprs: Vec<&Expr>) -> 
Result<(Vec<Expr>, Vec<Expr>)> {
 /// # Return value
 ///
 /// The first element, or an error
+#[deprecated(
+    since = "34.0.0",
+    note = "use `datafusion_expr::utils::only_or_err` instead"
+)]
 pub fn only_or_err<T>(slice: &[T]) -> Result<&T> {
-    match slice {
-        [it] => Ok(it),
-        [] => plan_err!("No items found!"),
-        _ => plan_err!("More than one item found!"),
-    }
+    expr_utils::only_or_err(slice)
 }
 
 /// merge inputs schema into a single schema.
+#[deprecated(
+    since = "34.0.0",
+    note = "use `datafusion_expr::utils::merge_schema` instead"
+)]
 pub fn merge_schema(inputs: Vec<&LogicalPlan>) -> DFSchema {
-    if inputs.len() == 1 {
-        inputs[0].schema().clone().as_ref().clone()
-    } else {
-        inputs.iter().map(|input| input.schema()).fold(
-            DFSchema::empty(),
-            |mut lhs, rhs| {
-                lhs.merge(rhs);
-                lhs
-            },
-        )
-    }
-}
-
-pub(crate) fn collect_subquery_cols(
-    exprs: &[Expr],
-    subquery_schema: DFSchemaRef,
-) -> Result<BTreeSet<Column>> {
-    exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
-        let mut using_cols: Vec<Column> = vec![];
-        for col in expr.to_columns()?.into_iter() {
-            if subquery_schema.has_column(&col) {
-                using_cols.push(col);
-            }
-        }
-
-        cols.extend(using_cols);
-        Result::<_>::Ok(cols)
-    })
-}
-
-pub(crate) fn replace_qualified_name(
-    expr: Expr,
-    cols: &BTreeSet<Column>,
-    subquery_alias: &str,
-) -> Result<Expr> {
-    let alias_cols: Vec<Column> = cols
-        .iter()
-        .map(|col| {
-            Column::from_qualified_name(format!("{}.{}", subquery_alias, 
col.name))
-        })
-        .collect();
-    let replace_map: HashMap<&Column, &Column> =
-        cols.iter().zip(alias_cols.iter()).collect();
-
-    replace_col(expr, &replace_map)
-}
-
-/// Log the plan in debug/tracing mode after some part of the optimizer runs
-pub fn log_plan(description: &str, plan: &LogicalPlan) {
-    debug!("{description}:\n{}\n", plan.display_indent());
-    trace!("{description}::\n{}\n", plan.display_indent_schema());
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use arrow::datatypes::DataType;
-    use datafusion_common::Column;
-    use datafusion_expr::expr::Cast;
-    use datafusion_expr::{col, lit, utils::expr_to_columns};
-    use std::collections::HashSet;
-
-    #[test]
-    fn test_split_conjunction() {
-        let expr = col("a");
-        let result = split_conjunction(&expr);
-        assert_eq!(result, vec![&expr]);
-    }
-
-    #[test]
-    fn test_split_conjunction_two() {
-        let expr = col("a").eq(lit(5)).and(col("b"));
-        let expr1 = col("a").eq(lit(5));
-        let expr2 = col("b");
-
-        let result = split_conjunction(&expr);
-        assert_eq!(result, vec![&expr1, &expr2]);
-    }
-
-    #[test]
-    fn test_split_conjunction_alias() {
-        let expr = col("a").eq(lit(5)).and(col("b").alias("the_alias"));
-        let expr1 = col("a").eq(lit(5));
-        let expr2 = col("b"); // has no alias
-
-        let result = split_conjunction(&expr);
-        assert_eq!(result, vec![&expr1, &expr2]);
-    }
-
-    #[test]
-    fn test_split_conjunction_or() {
-        let expr = col("a").eq(lit(5)).or(col("b"));
-        let result = split_conjunction(&expr);
-        assert_eq!(result, vec![&expr]);
-    }
-
-    #[test]
-    fn test_split_binary_owned() {
-        let expr = col("a");
-        assert_eq!(split_binary_owned(expr.clone(), Operator::And), 
vec![expr]);
-    }
-
-    #[test]
-    fn test_split_binary_owned_two() {
-        assert_eq!(
-            split_binary_owned(col("a").eq(lit(5)).and(col("b")), 
Operator::And),
-            vec![col("a").eq(lit(5)), col("b")]
-        );
-    }
-
-    #[test]
-    fn test_split_binary_owned_different_op() {
-        let expr = col("a").eq(lit(5)).or(col("b"));
-        assert_eq!(
-            // expr is connected by OR, but pass in AND
-            split_binary_owned(expr.clone(), Operator::And),
-            vec![expr]
-        );
-    }
-
-    #[test]
-    fn test_split_conjunction_owned() {
-        let expr = col("a");
-        assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]);
-    }
-
-    #[test]
-    fn test_split_conjunction_owned_two() {
-        assert_eq!(
-            split_conjunction_owned(col("a").eq(lit(5)).and(col("b"))),
-            vec![col("a").eq(lit(5)), col("b")]
-        );
-    }
-
-    #[test]
-    fn test_split_conjunction_owned_alias() {
-        assert_eq!(
-            
split_conjunction_owned(col("a").eq(lit(5)).and(col("b").alias("the_alias"))),
-            vec![
-                col("a").eq(lit(5)),
-                // no alias on b
-                col("b"),
-            ]
-        );
-    }
-
-    #[test]
-    fn test_conjunction_empty() {
-        assert_eq!(conjunction(vec![]), None);
-    }
-
-    #[test]
-    fn test_conjunction() {
-        // `[A, B, C]`
-        let expr = conjunction(vec![col("a"), col("b"), col("c")]);
-
-        // --> `(A AND B) AND C`
-        assert_eq!(expr, Some(col("a").and(col("b")).and(col("c"))));
-
-        // which is different than `A AND (B AND C)`
-        assert_ne!(expr, Some(col("a").and(col("b").and(col("c")))));
-    }
-
-    #[test]
-    fn test_disjunction_empty() {
-        assert_eq!(disjunction(vec![]), None);
-    }
-
-    #[test]
-    fn test_disjunction() {
-        // `[A, B, C]`
-        let expr = disjunction(vec![col("a"), col("b"), col("c")]);
-
-        // --> `(A OR B) OR C`
-        assert_eq!(expr, Some(col("a").or(col("b")).or(col("c"))));
-
-        // which is different than `A OR (B OR C)`
-        assert_ne!(expr, Some(col("a").or(col("b").or(col("c")))));
-    }
-
-    #[test]
-    fn test_split_conjunction_owned_or() {
-        let expr = col("a").eq(lit(5)).or(col("b"));
-        assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]);
-    }
-
-    #[test]
-    fn test_collect_expr() -> Result<()> {
-        let mut accum: HashSet<Column> = HashSet::new();
-        expr_to_columns(
-            &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)),
-            &mut accum,
-        )?;
-        expr_to_columns(
-            &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)),
-            &mut accum,
-        )?;
-        assert_eq!(1, accum.len());
-        assert!(accum.contains(&Column::from_name("a")));
-        Ok(())
-    }
+    expr_utils::merge_schema(inputs)
 }
diff --git a/datafusion/substrait/src/logical_plan/consumer.rs 
b/datafusion/substrait/src/logical_plan/consumer.rs
index 5cb72adaca..b7a51032dc 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -32,7 +32,7 @@ use datafusion::prelude::JoinType;
 use datafusion::sql::TableReference;
 use datafusion::{
     error::{DataFusionError, Result},
-    optimizer::utils::split_conjunction,
+    logical_expr::utils::split_conjunction,
     prelude::{Column, SessionContext},
     scalar::ScalarValue,
 };


Reply via email to