This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new e8ba45c75 Basic support for `IN` and `NOT IN` Subqueries by rewriting
them to `SEMI` / `ANTI` (#2421)
e8ba45c75 is described below
commit e8ba45c758576268f25fb7a357e72dff83e5413f
Author: Eduard Karacharov <[email protected]>
AuthorDate: Wed May 4 16:52:16 2022 +0300
Basic support for `IN` and `NOT IN` Subqueries by rewriting them to `SEMI`
/ `ANTI` (#2421)
* naive in subquery implementation
* 16 and 18 tpch queries enabled in benchmark
* rollback rewriting instead of fail
* try_fold used for input plan rewriting
* test readability & negative test cases
---
benchmarks/src/bin/tpch.rs | 10 +
datafusion/core/src/execution/context.rs | 2 +
datafusion/core/src/optimizer/filter_push_down.rs | 66 +---
datafusion/core/src/optimizer/mod.rs | 1 +
.../core/src/optimizer/subquery_filter_to_join.rs | 389 +++++++++++++++++++++
datafusion/core/src/optimizer/utils.rs | 37 +-
6 files changed, 454 insertions(+), 51 deletions(-)
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index ef8abb01c..15b7f987a 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -1074,6 +1074,16 @@ mod tests {
run_query(14).await
}
+ #[tokio::test]
+ async fn run_q16() -> Result<()> {
+ run_query(16).await
+ }
+
+ #[tokio::test]
+ async fn run_q18() -> Result<()> {
+ run_query(18).await
+ }
+
#[tokio::test]
async fn run_q19() -> Result<()> {
run_query(19).await
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index 01a5eefa9..895e5bc1e 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -72,6 +72,7 @@ use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::projection_push_down::ProjectionPushDown;
use crate::optimizer::simplify_expressions::SimplifyExpressions;
use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
+use crate::optimizer::subquery_filter_to_join::SubqueryFilterToJoin;
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
@@ -1199,6 +1200,7 @@ impl SessionState {
// Simplify expressions first to maximize the chance
// of applying other optimizations
Arc::new(SimplifyExpressions::new()),
+ Arc::new(SubqueryFilterToJoin::new()),
Arc::new(EliminateFilter::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateLimit::new()),
diff --git a/datafusion/core/src/optimizer/filter_push_down.rs
b/datafusion/core/src/optimizer/filter_push_down.rs
index 19535de86..0fd107b40 100644
--- a/datafusion/core/src/optimizer/filter_push_down.rs
+++ b/datafusion/core/src/optimizer/filter_push_down.rs
@@ -14,20 +14,17 @@
//! Filter Push Down optimizer rule ensures that filters are applied as early
as possible in the plan
+use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_expr::TableProviderFilterPushDown;
use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union};
use crate::logical_plan::{
- and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan,
TableScan,
+ col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan,
TableScan,
};
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
-use crate::{error::Result, logical_plan::Operator};
-use std::{
- collections::{HashMap, HashSet},
- sync::Arc,
-};
+use std::collections::{HashMap, HashSet};
/// Filter Push Down optimizer rule pushes filter clauses down the plan
/// # Introduction
@@ -95,23 +92,6 @@ fn push_down(state: &State, plan: &LogicalPlan) ->
Result<LogicalPlan> {
utils::from_plan(plan, &expr, &new_inputs)
}
-/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter]
with
-/// its predicate be all `predicates` ANDed.
-fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
- // reduce filters to a single filter with an AND
- let predicate = predicates
- .iter()
- .skip(1)
- .fold(predicates[0].clone(), |acc, predicate| {
- and(acc, (*predicate).to_owned())
- });
-
- LogicalPlan::Filter(Filter {
- predicate,
- input: Arc::new(plan),
- })
-}
-
// remove all filters from `filters` that are in `predicate_columns`
fn remove_filters(
filters: &[(Expr, HashSet<Column>)],
@@ -150,7 +130,7 @@ fn issue_filters(
return push_down(&state, plan);
}
- let plan = add_filter(plan.clone(), &predicates);
+ let plan = utils::add_filter(plan.clone(), &predicates);
state.filters = remove_filters(&state.filters, &predicate_columns);
@@ -158,24 +138,6 @@ fn issue_filters(
push_down(&state, &plan)
}
-/// converts "A AND B AND C" => [A, B, C]
-fn split_members<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) {
- match predicate {
- Expr::BinaryExpr {
- right,
- op: Operator::And,
- left,
- } => {
- split_members(left, predicates);
- split_members(right, predicates);
- }
- Expr::Alias(expr, _) => {
- split_members(expr, predicates);
- }
- other => predicates.push(other),
- }
-}
-
// For a given JOIN logical plan, determine whether each side of the join is
preserved.
// We say a join side is preserved if the join returns all or a subset of the
rows from
// the relevant side, such that each row of the output table directly maps to
a row of
@@ -289,7 +251,7 @@ fn optimize_join(
Ok(plan)
} else {
// wrap the join on the filter whose predicates must be kept
- let plan = add_filter(plan, &to_keep.0);
+ let plan = utils::add_filter(plan, &to_keep.0);
state.filters = remove_filters(&state.filters, &to_keep.1);
Ok(plan)
@@ -305,7 +267,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
LogicalPlan::Analyze { .. } => push_down(&state, plan),
LogicalPlan::Filter(Filter { input, predicate }) => {
let mut predicates = vec![];
- split_members(predicate, &mut predicates);
+ utils::split_conjunction(predicate, &mut predicates);
// Predicates without referencing columns (WHERE FALSE, WHERE 1=1,
etc.)
let mut no_col_predicates = vec![];
@@ -328,7 +290,10 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
// As those contain only literals, they could be optimized using
constant folding
// and removal of WHERE TRUE / WHERE FALSE
if !no_col_predicates.is_empty() {
- Ok(add_filter(optimize(input, state)?, &no_col_predicates))
+ Ok(utils::add_filter(
+ optimize(input, state)?,
+ &no_col_predicates,
+ ))
} else {
optimize(input, state)
}
@@ -592,17 +557,18 @@ fn rewrite(expr: &Expr, projection: &HashMap<String,
Expr>) -> Result<Expr> {
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
use super::*;
use crate::datasource::TableProvider;
+ use crate::logical_plan::plan::provider_as_source;
use crate::logical_plan::{
- lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder,
Operator,
+ and, col, lit, sum, union_with_alias, DFSchema, Expr,
LogicalPlanBuilder,
+ Operator,
};
use crate::physical_plan::ExecutionPlan;
+ use crate::prelude::JoinType;
use crate::test::*;
- use crate::{
- logical_plan::{col, plan::provider_as_source},
- prelude::JoinType,
- };
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
diff --git a/datafusion/core/src/optimizer/mod.rs
b/datafusion/core/src/optimizer/mod.rs
index 9f12ecea8..b274ab645 100644
--- a/datafusion/core/src/optimizer/mod.rs
+++ b/datafusion/core/src/optimizer/mod.rs
@@ -28,4 +28,5 @@ pub mod optimizer;
pub mod projection_push_down;
pub mod simplify_expressions;
pub mod single_distinct_to_groupby;
+pub mod subquery_filter_to_join;
pub mod utils;
diff --git a/datafusion/core/src/optimizer/subquery_filter_to_join.rs
b/datafusion/core/src/optimizer/subquery_filter_to_join.rs
new file mode 100644
index 000000000..5f4583c28
--- /dev/null
+++ b/datafusion/core/src/optimizer/subquery_filter_to_join.rs
@@ -0,0 +1,389 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule for rewriting subquery filters to joins
+//!
+//! It handles standalone parts of logical conjunction expressions, i.e.
+//! ```text
+//! WHERE t1.f IN (SELECT f FROM t2) AND t2.f = 'x'
+//! ```
+//! will be rewritten, but
+//! ```text
+//! WHERE t1.f IN (SELECT f FROM t2) OR t2.f = 'x'
+//! ```
+//! won't
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::{Filter, Join};
+use crate::logical_plan::{
+ build_join_schema, Expr, JoinConstraint, JoinType, LogicalPlan,
+};
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+
+/// Optimizer rule for rewriting subquery filters to joins
+#[derive(Default)]
+pub struct SubqueryFilterToJoin {}
+
+impl SubqueryFilterToJoin {
+ #[allow(missing_docs)]
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl OptimizerRule for SubqueryFilterToJoin {
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ execution_props: &ExecutionProps,
+ ) -> Result<LogicalPlan> {
+ match plan {
+ LogicalPlan::Filter(Filter { predicate, input }) => {
+ // Apply optimizer rule to current input
+ let optimized_input = self.optimize(input, execution_props)?;
+
+ // Splitting filter expression into components by AND
+ let mut filters = vec![];
+ utils::split_conjunction(predicate, &mut filters);
+
+ // Searching for subquery-based filters
+ let (subquery_filters, regular_filters): (Vec<&Expr>,
Vec<&Expr>) =
+ filters
+ .into_iter()
+ .partition(|&e| matches!(e, Expr::InSubquery { .. }));
+
+ // Check all subquery filters could be rewritten
+ //
+ // In case of expressions which could not be rewritten
+ // return original filter with optimized input
+ let mut subqueries_in_regular = vec![];
+ regular_filters.iter().try_for_each(|&e| {
+ extract_subquery_filters(e, &mut subqueries_in_regular)
+ })?;
+
+ if !subqueries_in_regular.is_empty() {
+ return Ok(LogicalPlan::Filter(Filter {
+ predicate: predicate.clone(),
+ input: Arc::new(optimized_input),
+ }));
+ };
+
+ // Add subquery joins to new_input
+ // optimized_input value should retain for possible
optimization rollback
+ let opt_result = subquery_filters.iter().try_fold(
+ optimized_input.clone(),
+ |input, &e| match e {
+ Expr::InSubquery {
+ expr,
+ subquery,
+ negated,
+ } => {
+ let right_input = self.optimize(
+ &*subquery.subquery,
+ execution_props
+ )?;
+ let right_schema = right_input.schema();
+ if right_schema.fields().len() != 1 {
+ return Err(DataFusionError::Plan(
+ "Only single column allowed in InSubquery"
+ .to_string(),
+ ));
+ };
+
+ let right_key =
right_schema.field(0).qualified_column();
+ let left_key = match *expr.clone() {
+ Expr::Column(col) => col,
+ _ => return
Err(DataFusionError::NotImplemented(
+ "Filtering by expression not implemented
for InSubquery"
+ .to_string(),
+ )),
+ };
+
+ let join_type = if *negated {
+ JoinType::Anti
+ } else {
+ JoinType::Semi
+ };
+
+ let schema = build_join_schema(
+ optimized_input.schema(),
+ right_schema,
+ &join_type,
+ )?;
+
+ Ok(LogicalPlan::Join(Join {
+ left: Arc::new(input),
+ right: Arc::new(right_input),
+ on: vec![(left_key, right_key)],
+ join_type,
+ join_constraint: JoinConstraint::On,
+ schema: Arc::new(schema),
+ null_equals_null: false,
+ }))
+ }
+ _ => Err(DataFusionError::Plan(
+ "Unknown expression while rewriting subquery to
joins"
+ .to_string(),
+ )),
+ }
+ );
+
+ // In case of expressions which could not be rewritten
+ // return original filter with optimized input
+ let new_input = match opt_result {
+ Ok(plan) => plan,
+ Err(_) => {
+ return Ok(LogicalPlan::Filter(Filter {
+ predicate: predicate.clone(),
+ input: Arc::new(optimized_input),
+ }))
+ }
+ };
+
+ // Apply regular filters to join output if some or just return
join
+ if regular_filters.is_empty() {
+ Ok(new_input)
+ } else {
+ Ok(utils::add_filter(new_input, ®ular_filters))
+ }
+ }
+ _ => {
+ // Apply the optimization to all inputs of the plan
+ utils::optimize_children(self, plan, execution_props)
+ }
+ }
+ }
+
+ fn name(&self) -> &str {
+ "subquery_filter_to_join"
+ }
+}
+
+fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec<Expr>) ->
Result<()> {
+ utils::expr_sub_expressions(expression)?
+ .into_iter()
+ .try_for_each(|se| match se {
+ Expr::InSubquery { .. } => {
+ extracted.push(se);
+ Ok(())
+ }
+ _ => extract_subquery_filters(&se, extracted),
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::logical_plan::{
+ and, binary_expr, col, in_subquery, lit, not_in_subquery, or,
LogicalPlanBuilder,
+ Operator,
+ };
+ use crate::test::*;
+
+ fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
+ let rule = SubqueryFilterToJoin::new();
+ let optimized_plan = rule
+ .optimize(plan, &ExecutionProps::new())
+ .expect("failed to optimize plan");
+ let formatted_plan = format!("{}",
optimized_plan.display_indent_schema());
+ assert_eq!(formatted_plan, expected);
+ }
+
+ fn test_subquery_with_name(name: &str) -> Result<Arc<LogicalPlan>> {
+ let table_scan = test_table_scan_with_name(name)?;
+ Ok(Arc::new(
+ LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("c")])?
+ .build()?,
+ ))
+ }
+
+ /// Test for single IN subquery filter
+ #[test]
+ fn in_subquery_simple() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .filter(in_subquery(col("c"), test_subquery_with_name("sq")?))?
+ .project(vec![col("test.b")])?
+ .build()?;
+
+ let expected = "Projection: #test.b [b:UInt32]\
+ \n Semi Join: #test.c = #sq.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\
+ \n Projection: #sq.c [c:UInt32]\
+ \n TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ /// Test for single NOT IN subquery filter
+ #[test]
+ fn not_in_subquery_simple() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .filter(not_in_subquery(col("c"), test_subquery_with_name("sq")?))?
+ .project(vec![col("test.b")])?
+ .build()?;
+
+ let expected = "Projection: #test.b [b:UInt32]\
+ \n Anti Join: #test.c = #sq.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\
+ \n Projection: #sq.c [c:UInt32]\
+ \n TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ /// Test for several IN subquery expressions
+ #[test]
+ fn in_subquery_multiple() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .filter(and(
+ in_subquery(col("c"), test_subquery_with_name("sq_1")?),
+ in_subquery(col("b"), test_subquery_with_name("sq_2")?),
+ ))?
+ .project(vec![col("test.b")])?
+ .build()?;
+
+ let expected = "Projection: #test.b [b:UInt32]\
+ \n Semi Join: #test.b = #sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n Semi Join: #test.c = #sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\
+ \n Projection: #sq_1.c [c:UInt32]\
+ \n TableScan: sq_1 projection=None [a:UInt32, b:UInt32,
c:UInt32]\
+ \n Projection: #sq_2.c [c:UInt32]\
+ \n TableScan: sq_2 projection=None [a:UInt32, b:UInt32,
c:UInt32]";
+
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ /// Test for IN subquery with additional AND filter
+ #[test]
+ fn in_subquery_with_and_filters() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .filter(and(
+ in_subquery(col("c"), test_subquery_with_name("sq")?),
+ and(
+ binary_expr(col("a"), Operator::Eq, lit(1_u32)),
+ binary_expr(col("b"), Operator::Lt, lit(30_u32)),
+ ),
+ ))?
+ .project(vec![col("test.b")])?
+ .build()?;
+
+ let expected = "Projection: #test.b [b:UInt32]\
+ \n Filter: #test.a = UInt32(1) AND #test.b < UInt32(30) [a:UInt32,
b:UInt32, c:UInt32]\
+ \n Semi Join: #test.c = #sq.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\
+ \n Projection: #sq.c [c:UInt32]\
+ \n TableScan: sq projection=None [a:UInt32, b:UInt32,
c:UInt32]";
+
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ /// Test for IN subquery with additional OR filter
+ /// filter expression not modified
+ #[test]
+ fn in_subquery_with_or_filters() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .filter(or(
+ and(
+ binary_expr(col("a"), Operator::Eq, lit(1_u32)),
+ binary_expr(col("b"), Operator::Lt, lit(30_u32)),
+ ),
+ in_subquery(col("c"), test_subquery_with_name("sq")?),
+ ))?
+ .project(vec![col("test.b")])?
+ .build()?;
+
+ let expected = "Projection: #test.b [b:UInt32]\
+ \n Filter: #test.a = UInt32(1) AND #test.b < UInt32(30) OR #test.c IN
(\
+ Subquery: Projection: #sq.c\
+ \n TableScan: sq projection=None) [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ /// Test for nested IN subqueries
+ #[test]
+ fn in_subquery_nested() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let subquery =
LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
+ .filter(in_subquery(col("a"),
test_subquery_with_name("sq_nested")?))?
+ .project(vec![col("a")])?
+ .build()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .filter(in_subquery(col("b"), Arc::new(subquery)))?
+ .project(vec![col("test.b")])?
+ .build()?;
+
+ let expected = "Projection: #test.b [b:UInt32]\
+ \n Semi Join: #test.b = #sq.a [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\
+ \n Projection: #sq.a [a:UInt32]\
+ \n Semi Join: #sq.a = #sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]\
+ \n Projection: #sq_nested.c [c:UInt32]\
+ \n TableScan: sq_nested projection=None [a:UInt32, b:UInt32,
c:UInt32]";
+
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ /// Test for filter input modification in case filter not supported
+ /// Outer filter expression not modified while inner converted to join
+ #[test]
+ fn in_subquery_input_modified() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .filter(in_subquery(col("c"),
test_subquery_with_name("sq_inner")?))?
+ .project_with_alias(vec![col("b"), col("c")],
Some("wrapped".to_string()))?
+ .filter(or(
+ binary_expr(col("b"), Operator::Lt, lit(30_u32)),
+ in_subquery(col("c"), test_subquery_with_name("sq_outer")?),
+ ))?
+ .project(vec![col("b")])?
+ .build()?;
+
+ let expected = "Projection: #wrapped.b [b:UInt32]\
+ \n Filter: #wrapped.b < UInt32(30) OR #wrapped.c IN (\
+ Subquery: Projection: #sq_outer.c\
+ \n TableScan: sq_outer projection=None) [b:UInt32, c:UInt32]\
+ \n Projection: #test.b, #test.c, alias=wrapped [b:UInt32, c:UInt32]\
+ \n Semi Join: #test.c = #sq_inner.c [a:UInt32, b:UInt32,
c:UInt32]\
+ \n TableScan: test projection=None [a:UInt32, b:UInt32,
c:UInt32]\
+ \n Projection: #sq_inner.c [c:UInt32]\
+ \n TableScan: sq_inner projection=None [a:UInt32, b:UInt32,
c:UInt32]";
+
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+}
diff --git a/datafusion/core/src/optimizer/utils.rs
b/datafusion/core/src/optimizer/utils.rs
index df36761fe..48855df9f 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -25,7 +25,7 @@ use datafusion_expr::logical_plan::{
};
use crate::logical_plan::{
- build_join_schema, Column, CreateMemoryTable, DFSchemaRef, Expr,
ExprVisitable,
+ and, build_join_schema, Column, CreateMemoryTable, DFSchemaRef, Expr,
ExprVisitable,
Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Recursion,
Repartition, Union, Values,
};
@@ -556,6 +556,41 @@ pub fn rewrite_expression(expr: &Expr, expressions:
&[Expr]) -> Result<Expr> {
}
}
+/// converts "A AND B AND C" => [A, B, C]
+pub fn split_conjunction<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a
Expr>) {
+ match predicate {
+ Expr::BinaryExpr {
+ right,
+ op: Operator::And,
+ left,
+ } => {
+ split_conjunction(left, predicates);
+ split_conjunction(right, predicates);
+ }
+ Expr::Alias(expr, _) => {
+ split_conjunction(expr, predicates);
+ }
+ other => predicates.push(other),
+ }
+}
+
+/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter]
with
+/// its predicate be all `predicates` ANDed.
+pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
+ // reduce filters to a single filter with an AND
+ let predicate = predicates
+ .iter()
+ .skip(1)
+ .fold(predicates[0].clone(), |acc, predicate| {
+ and(acc, (*predicate).to_owned())
+ });
+
+ LogicalPlan::Filter(Filter {
+ predicate,
+ input: Arc::new(plan),
+ })
+}
+
#[cfg(test)]
mod tests {
use super::*;