This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 11f164c5f3 Move some datafusion-optimizer::utils down to
datafusion-expr::utils (#8354)
11f164c5f3 is described below
commit 11f164c5f3568c3d4fa796bb63a7db36a7b1e821
Author: Jesse <[email protected]>
AuthorDate: Wed Nov 29 19:07:26 2023 +0100
Move some datafusion-optimizer::utils down to datafusion-expr::utils (#8354)
These utils manipulate `LogicalPlan`s and `Expr`s and may be useful in
projects that only depend on `datafusion-expr`
---
benchmarks/src/parquet_filter.rs | 2 +-
datafusion/core/src/datasource/listing/table.rs | 4 +-
datafusion/core/tests/parquet/filter_pushdown.rs | 2 +-
datafusion/expr/src/utils.rs | 381 +++++++++++++++++++-
datafusion/optimizer/src/analyzer/subquery.rs | 3 +-
datafusion/optimizer/src/analyzer/type_coercion.rs | 7 +-
datafusion/optimizer/src/decorrelate.rs | 5 +-
.../src/decorrelate_predicate_subquery.rs | 3 +-
.../optimizer/src/extract_equijoin_predicate.rs | 3 +-
datafusion/optimizer/src/optimizer.rs | 2 +-
datafusion/optimizer/src/push_down_filter.rs | 16 +-
.../optimizer/src/scalar_subquery_to_join.rs | 3 +-
.../src/simplify_expressions/simplify_exprs.rs | 2 +-
.../optimizer/src/unwrap_cast_in_comparison.rs | 2 +-
datafusion/optimizer/src/utils.rs | 399 +++++----------------
datafusion/substrait/src/logical_plan/consumer.rs | 2 +-
16 files changed, 499 insertions(+), 337 deletions(-)
diff --git a/benchmarks/src/parquet_filter.rs b/benchmarks/src/parquet_filter.rs
index e19596b80f..1d816908e2 100644
--- a/benchmarks/src/parquet_filter.rs
+++ b/benchmarks/src/parquet_filter.rs
@@ -19,8 +19,8 @@ use crate::AccessLogOpt;
use crate::{BenchmarkRun, CommonOpt};
use arrow::util::pretty;
use datafusion::common::Result;
+use datafusion::logical_expr::utils::disjunction;
use datafusion::logical_expr::{lit, or, Expr};
-use datafusion::optimizer::utils::disjunction;
use datafusion::physical_plan::collect;
use datafusion::prelude::{col, SessionContext};
use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 515bc8a9e6..a3be57db3a 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -40,11 +40,10 @@ use crate::datasource::{
physical_plan::{is_plan_streaming, FileScanConfig, FileSinkConfig},
TableProvider, TableType,
};
-use crate::logical_expr::TableProviderFilterPushDown;
use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
- logical_expr::Expr,
+ logical_expr::{utils::conjunction, Expr, TableProviderFilterPushDown},
physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
};
@@ -56,7 +55,6 @@ use datafusion_common::{
};
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
-use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{
create_physical_expr, LexOrdering, PhysicalSortRequirement,
};
diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs
b/datafusion/core/tests/parquet/filter_pushdown.rs
index 61a8f87b9e..f214e8903a 100644
--- a/datafusion/core/tests/parquet/filter_pushdown.rs
+++ b/datafusion/core/tests/parquet/filter_pushdown.rs
@@ -34,7 +34,7 @@ use datafusion::physical_plan::collect;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::prelude::{col, lit, lit_timestamp_nano, Expr, SessionContext};
use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
-use datafusion_optimizer::utils::{conjunction, disjunction, split_conjunction};
+use datafusion_expr::utils::{conjunction, disjunction, split_conjunction};
use itertools::Itertools;
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index d8668fba8e..7deb13c89b 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -18,9 +18,13 @@
//! Expression utilities
use crate::expr::{Alias, Sort, WindowFunction};
+use crate::expr_rewriter::strip_outer_reference;
use crate::logical_plan::Aggregate;
use crate::signature::{Signature, TypeSignature};
-use crate::{Cast, Expr, ExprSchemable, GroupingSet, LogicalPlan, TryCast};
+use crate::{
+ and, BinaryExpr, Cast, Expr, ExprSchemable, Filter, GroupingSet,
LogicalPlan,
+ Operator, TryCast,
+};
use arrow::datatypes::{DataType, TimeUnit};
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{
@@ -30,6 +34,7 @@ use datafusion_common::{
use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem,
WildcardAdditionalOptions};
use std::cmp::Ordering;
use std::collections::HashSet;
+use std::sync::Arc;
/// The value to which `COUNT(*)` is expanded to in
/// `COUNT(<constant>)` expressions
@@ -1004,12 +1009,245 @@ pub fn generate_signature_error_msg(
)
}
+/// Splits a conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]`
+///
+/// See [`split_conjunction_owned`] for more details and an example.
+pub fn split_conjunction(expr: &Expr) -> Vec<&Expr> {
+ split_conjunction_impl(expr, vec![])
+}
+
+fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) ->
Vec<&'a Expr> {
+ match expr {
+ Expr::BinaryExpr(BinaryExpr {
+ right,
+ op: Operator::And,
+ left,
+ }) => {
+ let exprs = split_conjunction_impl(left, exprs);
+ split_conjunction_impl(right, exprs)
+ }
+ Expr::Alias(Alias { expr, .. }) => split_conjunction_impl(expr, exprs),
+ other => {
+ exprs.push(other);
+ exprs
+ }
+ }
+}
+
+/// Splits an owned conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]`
+///
+/// This is often used to "split" filter expressions such as `col1 = 5
+/// AND col2 = 10` into [`col1 = 5`, `col2 = 10`];
+///
+/// # Example
+/// ```
+/// # use datafusion_expr::{col, lit};
+/// # use datafusion_expr::utils::split_conjunction_owned;
+/// // a=1 AND b=2
+/// let expr = col("a").eq(lit(1)).and(col("b").eq(lit(2)));
+///
+/// // [a=1, b=2]
+/// let split = vec![
+/// col("a").eq(lit(1)),
+/// col("b").eq(lit(2)),
+/// ];
+///
+/// // use split_conjunction_owned to split them
+/// assert_eq!(split_conjunction_owned(expr), split);
+/// ```
+pub fn split_conjunction_owned(expr: Expr) -> Vec<Expr> {
+ split_binary_owned(expr, Operator::And)
+}
+
+/// Splits an owned binary operator tree [`Expr`] such as `A <OP> B <OP> C` =>
`[A, B, C]`
+///
+/// This is often used to "split" expressions such as `col1 = 5
+/// AND col2 = 10` into [`col1 = 5`, `col2 = 10`];
+///
+/// # Example
+/// ```
+/// # use datafusion_expr::{col, lit, Operator};
+/// # use datafusion_expr::utils::split_binary_owned;
+/// # use std::ops::Add;
+/// // a=1 + b=2
+/// let expr = col("a").eq(lit(1)).add(col("b").eq(lit(2)));
+///
+/// // [a=1, b=2]
+/// let split = vec![
+/// col("a").eq(lit(1)),
+/// col("b").eq(lit(2)),
+/// ];
+///
+/// // use split_binary_owned to split them
+/// assert_eq!(split_binary_owned(expr, Operator::Plus), split);
+/// ```
+pub fn split_binary_owned(expr: Expr, op: Operator) -> Vec<Expr> {
+ split_binary_owned_impl(expr, op, vec![])
+}
+
+fn split_binary_owned_impl(
+ expr: Expr,
+ operator: Operator,
+ mut exprs: Vec<Expr>,
+) -> Vec<Expr> {
+ match expr {
+ Expr::BinaryExpr(BinaryExpr { right, op, left }) if op == operator => {
+ let exprs = split_binary_owned_impl(*left, operator, exprs);
+ split_binary_owned_impl(*right, operator, exprs)
+ }
+ Expr::Alias(Alias { expr, .. }) => {
+ split_binary_owned_impl(*expr, operator, exprs)
+ }
+ other => {
+ exprs.push(other);
+ exprs
+ }
+ }
+}
+
+/// Splits an binary operator tree [`Expr`] such as `A <OP> B <OP> C` => `[A,
B, C]`
+///
+/// See [`split_binary_owned`] for more details and an example.
+pub fn split_binary(expr: &Expr, op: Operator) -> Vec<&Expr> {
+ split_binary_impl(expr, op, vec![])
+}
+
+fn split_binary_impl<'a>(
+ expr: &'a Expr,
+ operator: Operator,
+ mut exprs: Vec<&'a Expr>,
+) -> Vec<&'a Expr> {
+ match expr {
+ Expr::BinaryExpr(BinaryExpr { right, op, left }) if *op == operator =>
{
+ let exprs = split_binary_impl(left, operator, exprs);
+ split_binary_impl(right, operator, exprs)
+ }
+ Expr::Alias(Alias { expr, .. }) => split_binary_impl(expr, operator,
exprs),
+ other => {
+ exprs.push(other);
+ exprs
+ }
+ }
+}
+
+/// Combines an array of filter expressions into a single filter
+/// expression consisting of the input filter expressions joined with
+/// logical AND.
+///
+/// Returns None if the filters array is empty.
+///
+/// # Example
+/// ```
+/// # use datafusion_expr::{col, lit};
+/// # use datafusion_expr::utils::conjunction;
+/// // a=1 AND b=2
+/// let expr = col("a").eq(lit(1)).and(col("b").eq(lit(2)));
+///
+/// // [a=1, b=2]
+/// let split = vec![
+/// col("a").eq(lit(1)),
+/// col("b").eq(lit(2)),
+/// ];
+///
+/// // use conjunction to join them together with `AND`
+/// assert_eq!(conjunction(split), Some(expr));
+/// ```
+pub fn conjunction(filters: impl IntoIterator<Item = Expr>) -> Option<Expr> {
+ filters.into_iter().reduce(|accum, expr| accum.and(expr))
+}
+
+/// Combines an array of filter expressions into a single filter
+/// expression consisting of the input filter expressions joined with
+/// logical OR.
+///
+/// Returns None if the filters array is empty.
+pub fn disjunction(filters: impl IntoIterator<Item = Expr>) -> Option<Expr> {
+ filters.into_iter().reduce(|accum, expr| accum.or(expr))
+}
+
+/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter]
with
+/// its predicate be all `predicates` ANDed.
+pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) ->
Result<LogicalPlan> {
+ // reduce filters to a single filter with an AND
+ let predicate = predicates
+ .iter()
+ .skip(1)
+ .fold(predicates[0].clone(), |acc, predicate| {
+ and(acc, (*predicate).to_owned())
+ });
+
+ Ok(LogicalPlan::Filter(Filter::try_new(
+ predicate,
+ Arc::new(plan),
+ )?))
+}
+
+/// Looks for correlating expressions: for example, a binary expression with
one field from the subquery, and
+/// one not in the subquery (closed upon from outer scope)
+///
+/// # Arguments
+///
+/// * `exprs` - List of expressions that may or may not be joins
+///
+/// # Return value
+///
+/// Tuple of (expressions containing joins, remaining non-join expressions)
+pub fn find_join_exprs(exprs: Vec<&Expr>) -> Result<(Vec<Expr>, Vec<Expr>)> {
+ let mut joins = vec![];
+ let mut others = vec![];
+ for filter in exprs.into_iter() {
+ // If the expression contains correlated predicates, add it to join
filters
+ if filter.contains_outer() {
+ if !matches!(filter, Expr::BinaryExpr(BinaryExpr{ left, op:
Operator::Eq, right }) if left.eq(right))
+ {
+ joins.push(strip_outer_reference((*filter).clone()));
+ }
+ } else {
+ others.push((*filter).clone());
+ }
+ }
+
+ Ok((joins, others))
+}
+
+/// Returns the first (and only) element in a slice, or an error
+///
+/// # Arguments
+///
+/// * `slice` - The slice to extract from
+///
+/// # Return value
+///
+/// The first element, or an error
+pub fn only_or_err<T>(slice: &[T]) -> Result<&T> {
+ match slice {
+ [it] => Ok(it),
+ [] => plan_err!("No items found!"),
+ _ => plan_err!("More than one item found!"),
+ }
+}
+
+/// merge inputs schema into a single schema.
+pub fn merge_schema(inputs: Vec<&LogicalPlan>) -> DFSchema {
+ if inputs.len() == 1 {
+ inputs[0].schema().clone().as_ref().clone()
+ } else {
+ inputs.iter().map(|input| input.schema()).fold(
+ DFSchema::empty(),
+ |mut lhs, rhs| {
+ lhs.merge(rhs);
+ lhs
+ },
+ )
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
use crate::expr_vec_fmt;
use crate::{
- col, cube, expr, grouping_set, rollup, AggregateFunction, WindowFrame,
+ col, cube, expr, grouping_set, lit, rollup, AggregateFunction,
WindowFrame,
WindowFunction,
};
@@ -1322,4 +1560,143 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_split_conjunction() {
+ let expr = col("a");
+ let result = split_conjunction(&expr);
+ assert_eq!(result, vec![&expr]);
+ }
+
+ #[test]
+ fn test_split_conjunction_two() {
+ let expr = col("a").eq(lit(5)).and(col("b"));
+ let expr1 = col("a").eq(lit(5));
+ let expr2 = col("b");
+
+ let result = split_conjunction(&expr);
+ assert_eq!(result, vec![&expr1, &expr2]);
+ }
+
+ #[test]
+ fn test_split_conjunction_alias() {
+ let expr = col("a").eq(lit(5)).and(col("b").alias("the_alias"));
+ let expr1 = col("a").eq(lit(5));
+ let expr2 = col("b"); // has no alias
+
+ let result = split_conjunction(&expr);
+ assert_eq!(result, vec![&expr1, &expr2]);
+ }
+
+ #[test]
+ fn test_split_conjunction_or() {
+ let expr = col("a").eq(lit(5)).or(col("b"));
+ let result = split_conjunction(&expr);
+ assert_eq!(result, vec![&expr]);
+ }
+
+ #[test]
+ fn test_split_binary_owned() {
+ let expr = col("a");
+ assert_eq!(split_binary_owned(expr.clone(), Operator::And),
vec![expr]);
+ }
+
+ #[test]
+ fn test_split_binary_owned_two() {
+ assert_eq!(
+ split_binary_owned(col("a").eq(lit(5)).and(col("b")),
Operator::And),
+ vec![col("a").eq(lit(5)), col("b")]
+ );
+ }
+
+ #[test]
+ fn test_split_binary_owned_different_op() {
+ let expr = col("a").eq(lit(5)).or(col("b"));
+ assert_eq!(
+ // expr is connected by OR, but pass in AND
+ split_binary_owned(expr.clone(), Operator::And),
+ vec![expr]
+ );
+ }
+
+ #[test]
+ fn test_split_conjunction_owned() {
+ let expr = col("a");
+ assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]);
+ }
+
+ #[test]
+ fn test_split_conjunction_owned_two() {
+ assert_eq!(
+ split_conjunction_owned(col("a").eq(lit(5)).and(col("b"))),
+ vec![col("a").eq(lit(5)), col("b")]
+ );
+ }
+
+ #[test]
+ fn test_split_conjunction_owned_alias() {
+ assert_eq!(
+
split_conjunction_owned(col("a").eq(lit(5)).and(col("b").alias("the_alias"))),
+ vec![
+ col("a").eq(lit(5)),
+ // no alias on b
+ col("b"),
+ ]
+ );
+ }
+
+ #[test]
+ fn test_conjunction_empty() {
+ assert_eq!(conjunction(vec![]), None);
+ }
+
+ #[test]
+ fn test_conjunction() {
+ // `[A, B, C]`
+ let expr = conjunction(vec![col("a"), col("b"), col("c")]);
+
+ // --> `(A AND B) AND C`
+ assert_eq!(expr, Some(col("a").and(col("b")).and(col("c"))));
+
+ // which is different than `A AND (B AND C)`
+ assert_ne!(expr, Some(col("a").and(col("b").and(col("c")))));
+ }
+
+ #[test]
+ fn test_disjunction_empty() {
+ assert_eq!(disjunction(vec![]), None);
+ }
+
+ #[test]
+ fn test_disjunction() {
+ // `[A, B, C]`
+ let expr = disjunction(vec![col("a"), col("b"), col("c")]);
+
+ // --> `(A OR B) OR C`
+ assert_eq!(expr, Some(col("a").or(col("b")).or(col("c"))));
+
+ // which is different than `A OR (B OR C)`
+ assert_ne!(expr, Some(col("a").or(col("b").or(col("c")))));
+ }
+
+ #[test]
+ fn test_split_conjunction_owned_or() {
+ let expr = col("a").eq(lit(5)).or(col("b"));
+ assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]);
+ }
+
+ #[test]
+ fn test_collect_expr() -> Result<()> {
+ let mut accum: HashSet<Column> = HashSet::new();
+ expr_to_columns(
+ &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)),
+ &mut accum,
+ )?;
+ expr_to_columns(
+ &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)),
+ &mut accum,
+ )?;
+ assert_eq!(1, accum.len());
+ assert!(accum.contains(&Column::from_name("a")));
+ Ok(())
+ }
}
diff --git a/datafusion/optimizer/src/analyzer/subquery.rs
b/datafusion/optimizer/src/analyzer/subquery.rs
index 6b8b1020cd..7c5b70b19a 100644
--- a/datafusion/optimizer/src/analyzer/subquery.rs
+++ b/datafusion/optimizer/src/analyzer/subquery.rs
@@ -16,10 +16,11 @@
// under the License.
use crate::analyzer::check_plan;
-use crate::utils::{collect_subquery_cols, split_conjunction};
+use crate::utils::collect_subquery_cols;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_expr::expr_rewriter::strip_outer_reference;
+use datafusion_expr::utils::split_conjunction;
use datafusion_expr::{
Aggregate, BinaryExpr, Cast, Expr, Filter, Join, JoinType, LogicalPlan,
Operator,
Window,
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 6628e8961e..eb5d8c53a5 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -42,16 +42,15 @@ use datafusion_expr::type_coercion::other::{
get_coerce_type_for_case_expression, get_coerce_type_for_list,
};
use datafusion_expr::type_coercion::{is_datetime, is_utf8_or_large_utf8};
+use datafusion_expr::utils::merge_schema;
use datafusion_expr::{
is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown,
type_coercion, window_function, AggregateFunction, BuiltinScalarFunction,
Expr,
- LogicalPlan, Operator, Projection, ScalarFunctionDefinition, WindowFrame,
- WindowFrameBound, WindowFrameUnits,
+ ExprSchemable, LogicalPlan, Operator, Projection, ScalarFunctionDefinition,
+ Signature, WindowFrame, WindowFrameBound, WindowFrameUnits,
};
-use datafusion_expr::{ExprSchemable, Signature};
use crate::analyzer::AnalyzerRule;
-use crate::utils::merge_schema;
#[derive(Default)]
pub struct TypeCoercion {}
diff --git a/datafusion/optimizer/src/decorrelate.rs
b/datafusion/optimizer/src/decorrelate.rs
index c8162683f3..ed6f472186 100644
--- a/datafusion/optimizer/src/decorrelate.rs
+++ b/datafusion/optimizer/src/decorrelate.rs
@@ -16,15 +16,14 @@
// under the License.
use crate::simplify_expressions::{ExprSimplifier, SimplifyContext};
-use crate::utils::{
- collect_subquery_cols, conjunction, find_join_exprs, split_conjunction,
-};
+use crate::utils::collect_subquery_cols;
use datafusion_common::tree_node::{
RewriteRecursion, Transformed, TreeNode, TreeNodeRewriter,
};
use datafusion_common::{plan_err, Result};
use datafusion_common::{Column, DFSchemaRef, DataFusionError, ScalarValue};
use datafusion_expr::expr::Alias;
+use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction};
use datafusion_expr::{expr, EmptyRelation, Expr, LogicalPlan,
LogicalPlanBuilder};
use datafusion_physical_expr::execution_props::ExecutionProps;
use std::collections::{BTreeSet, HashMap};
diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
index 96b46663d8..450336376a 100644
--- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
+++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
@@ -17,7 +17,7 @@
use crate::decorrelate::PullUpCorrelatedExpr;
use crate::optimizer::ApplyOrder;
-use crate::utils::{conjunction, replace_qualified_name, split_conjunction};
+use crate::utils::replace_qualified_name;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::alias::AliasGenerator;
use datafusion_common::tree_node::TreeNode;
@@ -25,6 +25,7 @@ use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_expr::expr::{Exists, InSubquery};
use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
use datafusion_expr::logical_plan::{JoinType, Subquery};
+use datafusion_expr::utils::{conjunction, split_conjunction};
use datafusion_expr::{
exists, in_subquery, not_exists, not_in_subquery, BinaryExpr, Expr, Filter,
LogicalPlan, LogicalPlanBuilder, Operator,
diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs
b/datafusion/optimizer/src/extract_equijoin_predicate.rs
index 575969fbf7..24664d57c3 100644
--- a/datafusion/optimizer/src/extract_equijoin_predicate.rs
+++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs
@@ -17,11 +17,10 @@
//! [`ExtractEquijoinPredicate`] rule that extracts equijoin predicates
use crate::optimizer::ApplyOrder;
-use crate::utils::split_conjunction;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::DFSchema;
use datafusion_common::Result;
-use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
+use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair,
split_conjunction};
use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan,
Operator};
use std::sync::Arc;
diff --git a/datafusion/optimizer/src/optimizer.rs
b/datafusion/optimizer/src/optimizer.rs
index c7ad31f39b..7af46ed70a 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -45,7 +45,7 @@ use chrono::{DateTime, Utc};
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result};
-use datafusion_expr::logical_plan::LogicalPlan;
+use datafusion_expr::LogicalPlan;
use log::{debug, warn};
use std::collections::HashSet;
use std::sync::Arc;
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index 7a2c6a8d8c..95eeee931b 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -16,13 +16,13 @@
//! the plan.
use crate::optimizer::ApplyOrder;
-use crate::utils::{conjunction, split_conjunction, split_conjunction_owned};
-use crate::{utils, OptimizerConfig, OptimizerRule};
+use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{
internal_err, plan_datafusion_err, Column, DFSchema, DataFusionError,
Result,
};
use datafusion_expr::expr::Alias;
+use datafusion_expr::utils::{conjunction, split_conjunction,
split_conjunction_owned};
use datafusion_expr::Volatility;
use datafusion_expr::{
and,
@@ -546,9 +546,7 @@ fn push_down_join(
parent_predicate: Option<&Expr>,
) -> Result<Option<LogicalPlan>> {
let predicates = match parent_predicate {
- Some(parent_predicate) => {
- utils::split_conjunction_owned(parent_predicate.clone())
- }
+ Some(parent_predicate) =>
split_conjunction_owned(parent_predicate.clone()),
None => vec![],
};
@@ -556,7 +554,7 @@ fn push_down_join(
let on_filters = join
.filter
.as_ref()
- .map(|e| utils::split_conjunction_owned(e.clone()))
+ .map(|e| split_conjunction_owned(e.clone()))
.unwrap_or_default();
let mut is_inner_join = false;
@@ -805,7 +803,7 @@ impl OptimizerRule for PushDownFilter {
.map(|e|
Ok(Column::from_qualified_name(e.display_name()?)))
.collect::<Result<HashSet<_>>>()?;
- let predicates =
utils::split_conjunction_owned(filter.predicate.clone());
+ let predicates =
split_conjunction_owned(filter.predicate.clone());
let mut keep_predicates = vec![];
let mut push_predicates = vec![];
@@ -853,7 +851,7 @@ impl OptimizerRule for PushDownFilter {
}
}
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
- let predicates =
utils::split_conjunction_owned(filter.predicate.clone());
+ let predicates =
split_conjunction_owned(filter.predicate.clone());
push_down_all_join(
predicates,
vec![],
@@ -908,7 +906,7 @@ impl OptimizerRule for PushDownFilter {
let prevent_cols =
extension_plan.node.prevent_predicate_push_down_columns();
- let predicates =
utils::split_conjunction_owned(filter.predicate.clone());
+ let predicates =
split_conjunction_owned(filter.predicate.clone());
let mut keep_predicates = vec![];
let mut push_predicates = vec![];
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs
b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index 7ac0c25119..34ed4a9475 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -17,7 +17,7 @@
use crate::decorrelate::{PullUpCorrelatedExpr, UN_MATCHED_ROW_INDICATOR};
use crate::optimizer::ApplyOrder;
-use crate::utils::{conjunction, replace_qualified_name};
+use crate::utils::replace_qualified_name;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::alias::AliasGenerator;
use datafusion_common::tree_node::{
@@ -26,6 +26,7 @@ use datafusion_common::tree_node::{
use datafusion_common::{plan_err, Column, DataFusionError, Result,
ScalarValue};
use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
use datafusion_expr::logical_plan::{JoinType, Subquery};
+use datafusion_expr::utils::conjunction;
use datafusion_expr::{expr, EmptyRelation, Expr, LogicalPlan,
LogicalPlanBuilder};
use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;
diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
index 9dc83e0fad..43a41b1185 100644
--- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
+++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
@@ -20,10 +20,10 @@
use std::sync::Arc;
use super::{ExprSimplifier, SimplifyContext};
-use crate::utils::merge_schema;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::{DFSchema, DFSchemaRef, Result};
use datafusion_expr::logical_plan::LogicalPlan;
+use datafusion_expr::utils::merge_schema;
use datafusion_physical_expr::execution_props::ExecutionProps;
/// Optimizer Pass that simplifies [`LogicalPlan`]s by rewriting
diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
index 907c12b7af..91603e82a5 100644
--- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
+++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
@@ -19,7 +19,6 @@
//! of expr can be added if needed.
//! This rule can reduce adding the `Expr::Cast` the expr instead of adding
the `Expr::Cast` to literal expr.
use crate::optimizer::ApplyOrder;
-use crate::utils::merge_schema;
use crate::{OptimizerConfig, OptimizerRule};
use arrow::datatypes::{
DataType, TimeUnit, MAX_DECIMAL_FOR_EACH_PRECISION,
MIN_DECIMAL_FOR_EACH_PRECISION,
@@ -31,6 +30,7 @@ use datafusion_common::{
};
use datafusion_expr::expr::{BinaryExpr, Cast, InList, TryCast};
use datafusion_expr::expr_rewriter::rewrite_preserving_name;
+use datafusion_expr::utils::merge_schema;
use datafusion_expr::{
binary_expr, in_list, lit, Expr, ExprSchemable, LogicalPlan, Operator,
};
diff --git a/datafusion/optimizer/src/utils.rs
b/datafusion/optimizer/src/utils.rs
index a3e7e42875..48f72ee7a0 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -18,19 +18,13 @@
//! Collection of utility functions that are leveraged by the query optimizer
rules
use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::DataFusionError;
-use datafusion_common::{plan_err, Column, DFSchemaRef};
+use datafusion_common::{Column, DFSchemaRef};
use datafusion_common::{DFSchema, Result};
-use datafusion_expr::expr::{Alias, BinaryExpr};
-use datafusion_expr::expr_rewriter::{replace_col, strip_outer_reference};
-use datafusion_expr::{
- and,
- logical_plan::{Filter, LogicalPlan},
- Expr, Operator,
-};
+use datafusion_expr::expr_rewriter::replace_col;
+use datafusion_expr::utils as expr_utils;
+use datafusion_expr::{logical_plan::LogicalPlan, Expr, Operator};
use log::{debug, trace};
use std::collections::{BTreeSet, HashMap};
-use std::sync::Arc;
/// Convenience rule for writing optimizers: recursively invoke
/// optimize on plan's children and then return a node of the same
@@ -58,29 +52,55 @@ pub fn optimize_children(
}
}
+pub(crate) fn collect_subquery_cols(
+ exprs: &[Expr],
+ subquery_schema: DFSchemaRef,
+) -> Result<BTreeSet<Column>> {
+ exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
+ let mut using_cols: Vec<Column> = vec![];
+ for col in expr.to_columns()?.into_iter() {
+ if subquery_schema.has_column(&col) {
+ using_cols.push(col);
+ }
+ }
+
+ cols.extend(using_cols);
+ Result::<_>::Ok(cols)
+ })
+}
+
+pub(crate) fn replace_qualified_name(
+ expr: Expr,
+ cols: &BTreeSet<Column>,
+ subquery_alias: &str,
+) -> Result<Expr> {
+ let alias_cols: Vec<Column> = cols
+ .iter()
+ .map(|col| {
+ Column::from_qualified_name(format!("{}.{}", subquery_alias,
col.name))
+ })
+ .collect();
+ let replace_map: HashMap<&Column, &Column> =
+ cols.iter().zip(alias_cols.iter()).collect();
+
+ replace_col(expr, &replace_map)
+}
+
+/// Log the plan in debug/tracing mode after some part of the optimizer runs
+pub fn log_plan(description: &str, plan: &LogicalPlan) {
+ debug!("{description}:\n{}\n", plan.display_indent());
+ trace!("{description}::\n{}\n", plan.display_indent_schema());
+}
+
/// Splits a conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]`
///
/// See [`split_conjunction_owned`] for more details and an example.
+#[deprecated(
+ since = "34.0.0",
+ note = "use `datafusion_expr::utils::split_conjunction` instead"
+)]
pub fn split_conjunction(expr: &Expr) -> Vec<&Expr> {
- split_conjunction_impl(expr, vec![])
-}
-
-fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) ->
Vec<&'a Expr> {
- match expr {
- Expr::BinaryExpr(BinaryExpr {
- right,
- op: Operator::And,
- left,
- }) => {
- let exprs = split_conjunction_impl(left, exprs);
- split_conjunction_impl(right, exprs)
- }
- Expr::Alias(Alias { expr, .. }) => split_conjunction_impl(expr, exprs),
- other => {
- exprs.push(other);
- exprs
- }
- }
+ expr_utils::split_conjunction(expr)
}
/// Splits an owned conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]`
@@ -104,8 +124,12 @@ fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs:
Vec<&'a Expr>) -> Vec<&
/// // use split_conjunction_owned to split them
/// assert_eq!(split_conjunction_owned(expr), split);
/// ```
+#[deprecated(
+ since = "34.0.0",
+ note = "use `datafusion_expr::utils::split_conjunction_owned` instead"
+)]
pub fn split_conjunction_owned(expr: Expr) -> Vec<Expr> {
- split_binary_owned(expr, Operator::And)
+ expr_utils::split_conjunction_owned(expr)
}
/// Splits an owned binary operator tree [`Expr`] such as `A <OP> B <OP> C` =>
`[A, B, C]`
@@ -130,53 +154,23 @@ pub fn split_conjunction_owned(expr: Expr) -> Vec<Expr> {
/// // use split_binary_owned to split them
/// assert_eq!(split_binary_owned(expr, Operator::Plus), split);
/// ```
+#[deprecated(
+ since = "34.0.0",
+ note = "use `datafusion_expr::utils::split_binary_owned` instead"
+)]
pub fn split_binary_owned(expr: Expr, op: Operator) -> Vec<Expr> {
- split_binary_owned_impl(expr, op, vec![])
-}
-
-fn split_binary_owned_impl(
- expr: Expr,
- operator: Operator,
- mut exprs: Vec<Expr>,
-) -> Vec<Expr> {
- match expr {
- Expr::BinaryExpr(BinaryExpr { right, op, left }) if op == operator => {
- let exprs = split_binary_owned_impl(*left, operator, exprs);
- split_binary_owned_impl(*right, operator, exprs)
- }
- Expr::Alias(Alias { expr, .. }) => {
- split_binary_owned_impl(*expr, operator, exprs)
- }
- other => {
- exprs.push(other);
- exprs
- }
- }
+ expr_utils::split_binary_owned(expr, op)
}
/// Splits an binary operator tree [`Expr`] such as `A <OP> B <OP> C` => `[A,
B, C]`
///
/// See [`split_binary_owned`] for more details and an example.
+#[deprecated(
+ since = "34.0.0",
+ note = "use `datafusion_expr::utils::split_binary` instead"
+)]
pub fn split_binary(expr: &Expr, op: Operator) -> Vec<&Expr> {
- split_binary_impl(expr, op, vec![])
-}
-
-fn split_binary_impl<'a>(
- expr: &'a Expr,
- operator: Operator,
- mut exprs: Vec<&'a Expr>,
-) -> Vec<&'a Expr> {
- match expr {
- Expr::BinaryExpr(BinaryExpr { right, op, left }) if *op == operator =>
{
- let exprs = split_binary_impl(left, operator, exprs);
- split_binary_impl(right, operator, exprs)
- }
- Expr::Alias(Alias { expr, .. }) => split_binary_impl(expr, operator,
exprs),
- other => {
- exprs.push(other);
- exprs
- }
- }
+ expr_utils::split_binary(expr, op)
}
/// Combines an array of filter expressions into a single filter
@@ -201,8 +195,12 @@ fn split_binary_impl<'a>(
/// // use conjunction to join them together with `AND`
/// assert_eq!(conjunction(split), Some(expr));
/// ```
+#[deprecated(
+ since = "34.0.0",
+ note = "use `datafusion_expr::utils::conjunction` instead"
+)]
pub fn conjunction(filters: impl IntoIterator<Item = Expr>) -> Option<Expr> {
- filters.into_iter().reduce(|accum, expr| accum.and(expr))
+ expr_utils::conjunction(filters)
}
/// Combines an array of filter expressions into a single filter
@@ -210,25 +208,22 @@ pub fn conjunction(filters: impl IntoIterator<Item =
Expr>) -> Option<Expr> {
/// logical OR.
///
/// Returns None if the filters array is empty.
+#[deprecated(
+ since = "34.0.0",
+ note = "use `datafusion_expr::utils::disjunction` instead"
+)]
pub fn disjunction(filters: impl IntoIterator<Item = Expr>) -> Option<Expr> {
- filters.into_iter().reduce(|accum, expr| accum.or(expr))
+ expr_utils::disjunction(filters)
}
/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter]
with
/// its predicate be all `predicates` ANDed.
+#[deprecated(
+ since = "34.0.0",
+ note = "use `datafusion_expr::utils::add_filter` instead"
+)]
pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) ->
Result<LogicalPlan> {
- // reduce filters to a single filter with an AND
- let predicate = predicates
- .iter()
- .skip(1)
- .fold(predicates[0].clone(), |acc, predicate| {
- and(acc, (*predicate).to_owned())
- });
-
- Ok(LogicalPlan::Filter(Filter::try_new(
- predicate,
- Arc::new(plan),
- )?))
+ expr_utils::add_filter(plan, predicates)
}
/// Looks for correlating expressions: for example, a binary expression with
one field from the subquery, and
@@ -241,22 +236,12 @@ pub fn add_filter(plan: LogicalPlan, predicates:
&[&Expr]) -> Result<LogicalPlan
/// # Return value
///
/// Tuple of (expressions containing joins, remaining non-join expressions)
+#[deprecated(
+ since = "34.0.0",
+ note = "use `datafusion_expr::utils::find_join_exprs` instead"
+)]
pub fn find_join_exprs(exprs: Vec<&Expr>) -> Result<(Vec<Expr>, Vec<Expr>)> {
- let mut joins = vec![];
- let mut others = vec![];
- for filter in exprs.into_iter() {
- // If the expression contains correlated predicates, add it to join
filters
- if filter.contains_outer() {
- if !matches!(filter, Expr::BinaryExpr(BinaryExpr{ left, op:
Operator::Eq, right }) if left.eq(right))
- {
- joins.push(strip_outer_reference((*filter).clone()));
- }
- } else {
- others.push((*filter).clone());
- }
- }
-
- Ok((joins, others))
+ expr_utils::find_join_exprs(exprs)
}
/// Returns the first (and only) element in a slice, or an error
@@ -268,215 +253,19 @@ pub fn find_join_exprs(exprs: Vec<&Expr>) ->
Result<(Vec<Expr>, Vec<Expr>)> {
/// # Return value
///
/// The first element, or an error
+#[deprecated(
+ since = "34.0.0",
+ note = "use `datafusion_expr::utils::only_or_err` instead"
+)]
pub fn only_or_err<T>(slice: &[T]) -> Result<&T> {
- match slice {
- [it] => Ok(it),
- [] => plan_err!("No items found!"),
- _ => plan_err!("More than one item found!"),
- }
+ expr_utils::only_or_err(slice)
}
/// merge inputs schema into a single schema.
+#[deprecated(
+ since = "34.0.0",
+ note = "use `datafusion_expr::utils::merge_schema` instead"
+)]
pub fn merge_schema(inputs: Vec<&LogicalPlan>) -> DFSchema {
- if inputs.len() == 1 {
- inputs[0].schema().clone().as_ref().clone()
- } else {
- inputs.iter().map(|input| input.schema()).fold(
- DFSchema::empty(),
- |mut lhs, rhs| {
- lhs.merge(rhs);
- lhs
- },
- )
- }
-}
-
-pub(crate) fn collect_subquery_cols(
- exprs: &[Expr],
- subquery_schema: DFSchemaRef,
-) -> Result<BTreeSet<Column>> {
- exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
- let mut using_cols: Vec<Column> = vec![];
- for col in expr.to_columns()?.into_iter() {
- if subquery_schema.has_column(&col) {
- using_cols.push(col);
- }
- }
-
- cols.extend(using_cols);
- Result::<_>::Ok(cols)
- })
-}
-
-pub(crate) fn replace_qualified_name(
- expr: Expr,
- cols: &BTreeSet<Column>,
- subquery_alias: &str,
-) -> Result<Expr> {
- let alias_cols: Vec<Column> = cols
- .iter()
- .map(|col| {
- Column::from_qualified_name(format!("{}.{}", subquery_alias,
col.name))
- })
- .collect();
- let replace_map: HashMap<&Column, &Column> =
- cols.iter().zip(alias_cols.iter()).collect();
-
- replace_col(expr, &replace_map)
-}
-
-/// Log the plan in debug/tracing mode after some part of the optimizer runs
-pub fn log_plan(description: &str, plan: &LogicalPlan) {
- debug!("{description}:\n{}\n", plan.display_indent());
- trace!("{description}::\n{}\n", plan.display_indent_schema());
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use arrow::datatypes::DataType;
- use datafusion_common::Column;
- use datafusion_expr::expr::Cast;
- use datafusion_expr::{col, lit, utils::expr_to_columns};
- use std::collections::HashSet;
-
- #[test]
- fn test_split_conjunction() {
- let expr = col("a");
- let result = split_conjunction(&expr);
- assert_eq!(result, vec![&expr]);
- }
-
- #[test]
- fn test_split_conjunction_two() {
- let expr = col("a").eq(lit(5)).and(col("b"));
- let expr1 = col("a").eq(lit(5));
- let expr2 = col("b");
-
- let result = split_conjunction(&expr);
- assert_eq!(result, vec![&expr1, &expr2]);
- }
-
- #[test]
- fn test_split_conjunction_alias() {
- let expr = col("a").eq(lit(5)).and(col("b").alias("the_alias"));
- let expr1 = col("a").eq(lit(5));
- let expr2 = col("b"); // has no alias
-
- let result = split_conjunction(&expr);
- assert_eq!(result, vec![&expr1, &expr2]);
- }
-
- #[test]
- fn test_split_conjunction_or() {
- let expr = col("a").eq(lit(5)).or(col("b"));
- let result = split_conjunction(&expr);
- assert_eq!(result, vec![&expr]);
- }
-
- #[test]
- fn test_split_binary_owned() {
- let expr = col("a");
- assert_eq!(split_binary_owned(expr.clone(), Operator::And),
vec![expr]);
- }
-
- #[test]
- fn test_split_binary_owned_two() {
- assert_eq!(
- split_binary_owned(col("a").eq(lit(5)).and(col("b")),
Operator::And),
- vec![col("a").eq(lit(5)), col("b")]
- );
- }
-
- #[test]
- fn test_split_binary_owned_different_op() {
- let expr = col("a").eq(lit(5)).or(col("b"));
- assert_eq!(
- // expr is connected by OR, but pass in AND
- split_binary_owned(expr.clone(), Operator::And),
- vec![expr]
- );
- }
-
- #[test]
- fn test_split_conjunction_owned() {
- let expr = col("a");
- assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]);
- }
-
- #[test]
- fn test_split_conjunction_owned_two() {
- assert_eq!(
- split_conjunction_owned(col("a").eq(lit(5)).and(col("b"))),
- vec![col("a").eq(lit(5)), col("b")]
- );
- }
-
- #[test]
- fn test_split_conjunction_owned_alias() {
- assert_eq!(
-
split_conjunction_owned(col("a").eq(lit(5)).and(col("b").alias("the_alias"))),
- vec![
- col("a").eq(lit(5)),
- // no alias on b
- col("b"),
- ]
- );
- }
-
- #[test]
- fn test_conjunction_empty() {
- assert_eq!(conjunction(vec![]), None);
- }
-
- #[test]
- fn test_conjunction() {
- // `[A, B, C]`
- let expr = conjunction(vec![col("a"), col("b"), col("c")]);
-
- // --> `(A AND B) AND C`
- assert_eq!(expr, Some(col("a").and(col("b")).and(col("c"))));
-
- // which is different than `A AND (B AND C)`
- assert_ne!(expr, Some(col("a").and(col("b").and(col("c")))));
- }
-
- #[test]
- fn test_disjunction_empty() {
- assert_eq!(disjunction(vec![]), None);
- }
-
- #[test]
- fn test_disjunction() {
- // `[A, B, C]`
- let expr = disjunction(vec![col("a"), col("b"), col("c")]);
-
- // --> `(A OR B) OR C`
- assert_eq!(expr, Some(col("a").or(col("b")).or(col("c"))));
-
- // which is different than `A OR (B OR C)`
- assert_ne!(expr, Some(col("a").or(col("b").or(col("c")))));
- }
-
- #[test]
- fn test_split_conjunction_owned_or() {
- let expr = col("a").eq(lit(5)).or(col("b"));
- assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]);
- }
-
- #[test]
- fn test_collect_expr() -> Result<()> {
- let mut accum: HashSet<Column> = HashSet::new();
- expr_to_columns(
- &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)),
- &mut accum,
- )?;
- expr_to_columns(
- &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)),
- &mut accum,
- )?;
- assert_eq!(1, accum.len());
- assert!(accum.contains(&Column::from_name("a")));
- Ok(())
- }
+ expr_utils::merge_schema(inputs)
}
diff --git a/datafusion/substrait/src/logical_plan/consumer.rs
b/datafusion/substrait/src/logical_plan/consumer.rs
index 5cb72adaca..b7a51032dc 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -32,7 +32,7 @@ use datafusion::prelude::JoinType;
use datafusion::sql::TableReference;
use datafusion::{
error::{DataFusionError, Result},
- optimizer::utils::split_conjunction,
+ logical_expr::utils::split_conjunction,
prelude::{Column, SessionContext},
scalar::ScalarValue,
};