This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new e8ba45c75 Basic support for `IN` and `NOT IN` Subqueries by rewriting 
them to `SEMI` / `ANTI` (#2421)
e8ba45c75 is described below

commit e8ba45c758576268f25fb7a357e72dff83e5413f
Author: Eduard Karacharov <[email protected]>
AuthorDate: Wed May 4 16:52:16 2022 +0300

    Basic support for `IN` and `NOT IN` Subqueries by rewriting them to `SEMI` 
/ `ANTI` (#2421)
    
    * naive in subquery implementation
    
    * 16 and 18 tpch queries enabled in benchmark
    
    * rollback rewriting instead of fail
    
    * try_fold used for input plan rewriting
    
    * test readability & negative test cases
---
 benchmarks/src/bin/tpch.rs                         |  10 +
 datafusion/core/src/execution/context.rs           |   2 +
 datafusion/core/src/optimizer/filter_push_down.rs  |  66 +---
 datafusion/core/src/optimizer/mod.rs               |   1 +
 .../core/src/optimizer/subquery_filter_to_join.rs  | 389 +++++++++++++++++++++
 datafusion/core/src/optimizer/utils.rs             |  37 +-
 6 files changed, 454 insertions(+), 51 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index ef8abb01c..15b7f987a 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -1074,6 +1074,16 @@ mod tests {
         run_query(14).await
     }
 
+    #[tokio::test]
+    async fn run_q16() -> Result<()> {
+        run_query(16).await
+    }
+
+    #[tokio::test]
+    async fn run_q18() -> Result<()> {
+        run_query(18).await
+    }
+
     #[tokio::test]
     async fn run_q19() -> Result<()> {
         run_query(19).await
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index 01a5eefa9..895e5bc1e 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -72,6 +72,7 @@ use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::projection_push_down::ProjectionPushDown;
 use crate::optimizer::simplify_expressions::SimplifyExpressions;
 use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
+use crate::optimizer::subquery_filter_to_join::SubqueryFilterToJoin;
 
 use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
 use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
@@ -1199,6 +1200,7 @@ impl SessionState {
                 // Simplify expressions first to maximize the chance
                 // of applying other optimizations
                 Arc::new(SimplifyExpressions::new()),
+                Arc::new(SubqueryFilterToJoin::new()),
                 Arc::new(EliminateFilter::new()),
                 Arc::new(CommonSubexprEliminate::new()),
                 Arc::new(EliminateLimit::new()),
diff --git a/datafusion/core/src/optimizer/filter_push_down.rs 
b/datafusion/core/src/optimizer/filter_push_down.rs
index 19535de86..0fd107b40 100644
--- a/datafusion/core/src/optimizer/filter_push_down.rs
+++ b/datafusion/core/src/optimizer/filter_push_down.rs
@@ -14,20 +14,17 @@
 
 //! Filter Push Down optimizer rule ensures that filters are applied as early 
as possible in the plan
 
+use crate::error::Result;
 use crate::execution::context::ExecutionProps;
 use crate::logical_expr::TableProviderFilterPushDown;
 use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union};
 use crate::logical_plan::{
-    and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, 
TableScan,
+    col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, 
TableScan,
 };
 use crate::logical_plan::{DFSchema, Expr};
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::utils;
-use crate::{error::Result, logical_plan::Operator};
-use std::{
-    collections::{HashMap, HashSet},
-    sync::Arc,
-};
+use std::collections::{HashMap, HashSet};
 
 /// Filter Push Down optimizer rule pushes filter clauses down the plan
 /// # Introduction
@@ -95,23 +92,6 @@ fn push_down(state: &State, plan: &LogicalPlan) -> 
Result<LogicalPlan> {
     utils::from_plan(plan, &expr, &new_inputs)
 }
 
-/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] 
with
-/// its predicate be all `predicates` ANDed.
-fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
-    // reduce filters to a single filter with an AND
-    let predicate = predicates
-        .iter()
-        .skip(1)
-        .fold(predicates[0].clone(), |acc, predicate| {
-            and(acc, (*predicate).to_owned())
-        });
-
-    LogicalPlan::Filter(Filter {
-        predicate,
-        input: Arc::new(plan),
-    })
-}
-
 // remove all filters from `filters` that are in `predicate_columns`
 fn remove_filters(
     filters: &[(Expr, HashSet<Column>)],
@@ -150,7 +130,7 @@ fn issue_filters(
         return push_down(&state, plan);
     }
 
-    let plan = add_filter(plan.clone(), &predicates);
+    let plan = utils::add_filter(plan.clone(), &predicates);
 
     state.filters = remove_filters(&state.filters, &predicate_columns);
 
@@ -158,24 +138,6 @@ fn issue_filters(
     push_down(&state, &plan)
 }
 
-/// converts "A AND B AND C" => [A, B, C]
-fn split_members<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) {
-    match predicate {
-        Expr::BinaryExpr {
-            right,
-            op: Operator::And,
-            left,
-        } => {
-            split_members(left, predicates);
-            split_members(right, predicates);
-        }
-        Expr::Alias(expr, _) => {
-            split_members(expr, predicates);
-        }
-        other => predicates.push(other),
-    }
-}
-
 // For a given JOIN logical plan, determine whether each side of the join is 
preserved.
 // We say a join side is preserved if the join returns all or a subset of the 
rows from
 // the relevant side, such that each row of the output table directly maps to 
a row of
@@ -289,7 +251,7 @@ fn optimize_join(
         Ok(plan)
     } else {
         // wrap the join on the filter whose predicates must be kept
-        let plan = add_filter(plan, &to_keep.0);
+        let plan = utils::add_filter(plan, &to_keep.0);
         state.filters = remove_filters(&state.filters, &to_keep.1);
 
         Ok(plan)
@@ -305,7 +267,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
         LogicalPlan::Analyze { .. } => push_down(&state, plan),
         LogicalPlan::Filter(Filter { input, predicate }) => {
             let mut predicates = vec![];
-            split_members(predicate, &mut predicates);
+            utils::split_conjunction(predicate, &mut predicates);
 
             // Predicates without referencing columns (WHERE FALSE, WHERE 1=1, 
etc.)
             let mut no_col_predicates = vec![];
@@ -328,7 +290,10 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
             // As those contain only literals, they could be optimized using 
constant folding
             // and removal of WHERE TRUE / WHERE FALSE
             if !no_col_predicates.is_empty() {
-                Ok(add_filter(optimize(input, state)?, &no_col_predicates))
+                Ok(utils::add_filter(
+                    optimize(input, state)?,
+                    &no_col_predicates,
+                ))
             } else {
                 optimize(input, state)
             }
@@ -592,17 +557,18 @@ fn rewrite(expr: &Expr, projection: &HashMap<String, 
Expr>) -> Result<Expr> {
 
 #[cfg(test)]
 mod tests {
+    use std::sync::Arc;
+
     use super::*;
     use crate::datasource::TableProvider;
+    use crate::logical_plan::plan::provider_as_source;
     use crate::logical_plan::{
-        lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder, 
Operator,
+        and, col, lit, sum, union_with_alias, DFSchema, Expr, 
LogicalPlanBuilder,
+        Operator,
     };
     use crate::physical_plan::ExecutionPlan;
+    use crate::prelude::JoinType;
     use crate::test::*;
-    use crate::{
-        logical_plan::{col, plan::provider_as_source},
-        prelude::JoinType,
-    };
 
     use arrow::datatypes::SchemaRef;
     use async_trait::async_trait;
diff --git a/datafusion/core/src/optimizer/mod.rs 
b/datafusion/core/src/optimizer/mod.rs
index 9f12ecea8..b274ab645 100644
--- a/datafusion/core/src/optimizer/mod.rs
+++ b/datafusion/core/src/optimizer/mod.rs
@@ -28,4 +28,5 @@ pub mod optimizer;
 pub mod projection_push_down;
 pub mod simplify_expressions;
 pub mod single_distinct_to_groupby;
+pub mod subquery_filter_to_join;
 pub mod utils;
diff --git a/datafusion/core/src/optimizer/subquery_filter_to_join.rs 
b/datafusion/core/src/optimizer/subquery_filter_to_join.rs
new file mode 100644
index 000000000..5f4583c28
--- /dev/null
+++ b/datafusion/core/src/optimizer/subquery_filter_to_join.rs
@@ -0,0 +1,389 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule for rewriting subquery filters to joins
+//!
+//! It handles standalone parts of logical conjunction expressions, i.e.
+//! ```text
+//!   WHERE t1.f IN (SELECT f FROM t2) AND t2.f = 'x'
+//! ```
+//! will be rewritten, but
+//! ```text
+//!   WHERE t1.f IN (SELECT f FROM t2) OR t2.f = 'x'
+//! ```
+//! won't
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::{Filter, Join};
+use crate::logical_plan::{
+    build_join_schema, Expr, JoinConstraint, JoinType, LogicalPlan,
+};
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+
+/// Optimizer rule for rewriting subquery filters to joins
+#[derive(Default)]
+pub struct SubqueryFilterToJoin {}
+
+impl SubqueryFilterToJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for SubqueryFilterToJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
+        match plan {
+            LogicalPlan::Filter(Filter { predicate, input }) => {
+                // Apply optimizer rule to current input
+                let optimized_input = self.optimize(input, execution_props)?;
+
+                // Splitting filter expression into components by AND
+                let mut filters = vec![];
+                utils::split_conjunction(predicate, &mut filters);
+
+                // Searching for subquery-based filters
+                let (subquery_filters, regular_filters): (Vec<&Expr>, 
Vec<&Expr>) =
+                    filters
+                        .into_iter()
+                        .partition(|&e| matches!(e, Expr::InSubquery { .. }));
+
+                // Check all subquery filters could be rewritten
+                //
+                // In case of expressions which could not be rewritten
+                // return original filter with optimized input
+                let mut subqueries_in_regular = vec![];
+                regular_filters.iter().try_for_each(|&e| {
+                    extract_subquery_filters(e, &mut subqueries_in_regular)
+                })?;
+
+                if !subqueries_in_regular.is_empty() {
+                    return Ok(LogicalPlan::Filter(Filter {
+                        predicate: predicate.clone(),
+                        input: Arc::new(optimized_input),
+                    }));
+                };
+
+                // Add subquery joins to new_input
+                // optimized_input value should retain for possible 
optimization rollback
+                let opt_result = subquery_filters.iter().try_fold(
+                    optimized_input.clone(),
+                    |input, &e| match e {
+                        Expr::InSubquery {
+                            expr,
+                            subquery,
+                            negated,
+                        } => {
+                            let right_input = self.optimize(
+                                &*subquery.subquery,
+                                execution_props
+                            )?;
+                            let right_schema = right_input.schema();
+                            if right_schema.fields().len() != 1 {
+                                return Err(DataFusionError::Plan(
+                                    "Only single column allowed in InSubquery"
+                                        .to_string(),
+                                ));
+                            };
+
+                            let right_key = 
right_schema.field(0).qualified_column();
+                            let left_key = match *expr.clone() {
+                                Expr::Column(col) => col,
+                                _ => return 
Err(DataFusionError::NotImplemented(
+                                    "Filtering by expression not implemented 
for InSubquery"
+                                        .to_string(),
+                                )),
+                            };
+
+                            let join_type = if *negated {
+                                JoinType::Anti
+                            } else {
+                                JoinType::Semi
+                            };
+
+                            let schema = build_join_schema(
+                                optimized_input.schema(),
+                                right_schema,
+                                &join_type,
+                            )?;
+
+                            Ok(LogicalPlan::Join(Join {
+                                left: Arc::new(input),
+                                right: Arc::new(right_input),
+                                on: vec![(left_key, right_key)],
+                                join_type,
+                                join_constraint: JoinConstraint::On,
+                                schema: Arc::new(schema),
+                                null_equals_null: false,
+                            }))
+                        }
+                        _ => Err(DataFusionError::Plan(
+                            "Unknown expression while rewriting subquery to 
joins"
+                                .to_string(),
+                        )),
+                    }
+                );
+
+                // In case of expressions which could not be rewritten
+                // return original filter with optimized input
+                let new_input = match opt_result {
+                    Ok(plan) => plan,
+                    Err(_) => {
+                        return Ok(LogicalPlan::Filter(Filter {
+                            predicate: predicate.clone(),
+                            input: Arc::new(optimized_input),
+                        }))
+                    }
+                };
+
+                // Apply regular filters to join output if some or just return 
join
+                if regular_filters.is_empty() {
+                    Ok(new_input)
+                } else {
+                    Ok(utils::add_filter(new_input, &regular_filters))
+                }
+            }
+            _ => {
+                // Apply the optimization to all inputs of the plan
+                utils::optimize_children(self, plan, execution_props)
+            }
+        }
+    }
+
+    fn name(&self) -> &str {
+        "subquery_filter_to_join"
+    }
+}
+
+fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec<Expr>) -> 
Result<()> {
+    utils::expr_sub_expressions(expression)?
+        .into_iter()
+        .try_for_each(|se| match se {
+            Expr::InSubquery { .. } => {
+                extracted.push(se);
+                Ok(())
+            }
+            _ => extract_subquery_filters(&se, extracted),
+        })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::logical_plan::{
+        and, binary_expr, col, in_subquery, lit, not_in_subquery, or, 
LogicalPlanBuilder,
+        Operator,
+    };
+    use crate::test::*;
+
+    fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
+        let rule = SubqueryFilterToJoin::new();
+        let optimized_plan = rule
+            .optimize(plan, &ExecutionProps::new())
+            .expect("failed to optimize plan");
+        let formatted_plan = format!("{}", 
optimized_plan.display_indent_schema());
+        assert_eq!(formatted_plan, expected);
+    }
+
+    fn test_subquery_with_name(name: &str) -> Result<Arc<LogicalPlan>> {
+        let table_scan = test_table_scan_with_name(name)?;
+        Ok(Arc::new(
+            LogicalPlanBuilder::from(table_scan)
+                .project(vec![col("c")])?
+                .build()?,
+        ))
+    }
+
+    /// Test for single IN subquery filter
+    #[test]
+    fn in_subquery_simple() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(in_subquery(col("c"), test_subquery_with_name("sq")?))?
+            .project(vec![col("test.b")])?
+            .build()?;
+
+        let expected = "Projection: #test.b [b:UInt32]\
+        \n  Semi Join: #test.c = #sq.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n    TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\
+        \n    Projection: #sq.c [c:UInt32]\
+        \n      TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// Test for single NOT IN subquery filter
+    #[test]
+    fn not_in_subquery_simple() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(not_in_subquery(col("c"), test_subquery_with_name("sq")?))?
+            .project(vec![col("test.b")])?
+            .build()?;
+
+        let expected = "Projection: #test.b [b:UInt32]\
+        \n  Anti Join: #test.c = #sq.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n    TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\
+        \n    Projection: #sq.c [c:UInt32]\
+        \n      TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// Test for several IN subquery expressions
+    #[test]
+    fn in_subquery_multiple() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(and(
+                in_subquery(col("c"), test_subquery_with_name("sq_1")?),
+                in_subquery(col("b"), test_subquery_with_name("sq_2")?),
+            ))?
+            .project(vec![col("test.b")])?
+            .build()?;
+
+        let expected = "Projection: #test.b [b:UInt32]\
+        \n  Semi Join: #test.b = #sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n    Semi Join: #test.c = #sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n      TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\
+        \n      Projection: #sq_1.c [c:UInt32]\
+        \n        TableScan: sq_1 projection=None [a:UInt32, b:UInt32, 
c:UInt32]\
+        \n    Projection: #sq_2.c [c:UInt32]\
+        \n      TableScan: sq_2 projection=None [a:UInt32, b:UInt32, 
c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// Test for IN subquery with additional AND filter
+    #[test]
+    fn in_subquery_with_and_filters() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(and(
+                in_subquery(col("c"), test_subquery_with_name("sq")?),
+                and(
+                    binary_expr(col("a"), Operator::Eq, lit(1_u32)),
+                    binary_expr(col("b"), Operator::Lt, lit(30_u32)),
+                ),
+            ))?
+            .project(vec![col("test.b")])?
+            .build()?;
+
+        let expected = "Projection: #test.b [b:UInt32]\
+        \n  Filter: #test.a = UInt32(1) AND #test.b < UInt32(30) [a:UInt32, 
b:UInt32, c:UInt32]\
+        \n    Semi Join: #test.c = #sq.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n      TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\
+        \n      Projection: #sq.c [c:UInt32]\
+        \n        TableScan: sq projection=None [a:UInt32, b:UInt32, 
c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// Test for IN subquery with additional OR filter
+    /// filter expression not modified
+    #[test]
+    fn in_subquery_with_or_filters() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(or(
+                and(
+                    binary_expr(col("a"), Operator::Eq, lit(1_u32)),
+                    binary_expr(col("b"), Operator::Lt, lit(30_u32)),
+                ),
+                in_subquery(col("c"), test_subquery_with_name("sq")?),
+            ))?
+            .project(vec![col("test.b")])?
+            .build()?;
+
+        let expected = "Projection: #test.b [b:UInt32]\
+        \n  Filter: #test.a = UInt32(1) AND #test.b < UInt32(30) OR #test.c IN 
(\
+        Subquery: Projection: #sq.c\
+        \n  TableScan: sq projection=None) [a:UInt32, b:UInt32, c:UInt32]\
+        \n    TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// Test for nested IN subqueries
+    #[test]
+    fn in_subquery_nested() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let subquery = 
LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
+            .filter(in_subquery(col("a"), 
test_subquery_with_name("sq_nested")?))?
+            .project(vec![col("a")])?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(in_subquery(col("b"), Arc::new(subquery)))?
+            .project(vec![col("test.b")])?
+            .build()?;
+
+        let expected = "Projection: #test.b [b:UInt32]\
+        \n  Semi Join: #test.b = #sq.a [a:UInt32, b:UInt32, c:UInt32]\
+        \n    TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\
+        \n    Projection: #sq.a [a:UInt32]\
+        \n      Semi Join: #sq.a = #sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n        TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]\
+        \n        Projection: #sq_nested.c [c:UInt32]\
+        \n          TableScan: sq_nested projection=None [a:UInt32, b:UInt32, 
c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// Test for filter input modification in case filter not supported
+    /// Outer filter expression not modified while inner converted to join
+    #[test]
+    fn in_subquery_input_modified() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(in_subquery(col("c"), 
test_subquery_with_name("sq_inner")?))?
+            .project_with_alias(vec![col("b"), col("c")], 
Some("wrapped".to_string()))?
+            .filter(or(
+                binary_expr(col("b"), Operator::Lt, lit(30_u32)),
+                in_subquery(col("c"), test_subquery_with_name("sq_outer")?),
+            ))?
+            .project(vec![col("b")])?
+            .build()?;
+
+        let expected = "Projection: #wrapped.b [b:UInt32]\
+        \n  Filter: #wrapped.b < UInt32(30) OR #wrapped.c IN (\
+        Subquery: Projection: #sq_outer.c\
+        \n  TableScan: sq_outer projection=None) [b:UInt32, c:UInt32]\
+        \n    Projection: #test.b, #test.c, alias=wrapped [b:UInt32, c:UInt32]\
+        \n      Semi Join: #test.c = #sq_inner.c [a:UInt32, b:UInt32, 
c:UInt32]\
+        \n        TableScan: test projection=None [a:UInt32, b:UInt32, 
c:UInt32]\
+        \n        Projection: #sq_inner.c [c:UInt32]\
+        \n          TableScan: sq_inner projection=None [a:UInt32, b:UInt32, 
c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+}
diff --git a/datafusion/core/src/optimizer/utils.rs 
b/datafusion/core/src/optimizer/utils.rs
index df36761fe..48855df9f 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -25,7 +25,7 @@ use datafusion_expr::logical_plan::{
 };
 
 use crate::logical_plan::{
-    build_join_schema, Column, CreateMemoryTable, DFSchemaRef, Expr, 
ExprVisitable,
+    and, build_join_schema, Column, CreateMemoryTable, DFSchemaRef, Expr, 
ExprVisitable,
     Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Recursion,
     Repartition, Union, Values,
 };
@@ -556,6 +556,41 @@ pub fn rewrite_expression(expr: &Expr, expressions: 
&[Expr]) -> Result<Expr> {
     }
 }
 
+/// converts "A AND B AND C" => [A, B, C]
+pub fn split_conjunction<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a 
Expr>) {
+    match predicate {
+        Expr::BinaryExpr {
+            right,
+            op: Operator::And,
+            left,
+        } => {
+            split_conjunction(left, predicates);
+            split_conjunction(right, predicates);
+        }
+        Expr::Alias(expr, _) => {
+            split_conjunction(expr, predicates);
+        }
+        other => predicates.push(other),
+    }
+}
+
+/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] 
with
+/// its predicate be all `predicates` ANDed.
+pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
+    // reduce filters to a single filter with an AND
+    let predicate = predicates
+        .iter()
+        .skip(1)
+        .fold(predicates[0].clone(), |acc, predicate| {
+            and(acc, (*predicate).to_owned())
+        });
+
+    LogicalPlan::Filter(Filter {
+        predicate,
+        input: Arc::new(plan),
+    })
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

Reply via email to