alamb commented on code in PR #4826:
URL: https://github.com/apache/arrow-datafusion/pull/4826#discussion_r1067459739


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2810,3 +2810,137 @@ async fn type_coercion_join_with_filter_and_equi_expr() 
-> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn subquery_to_join_with_both_side_expr() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", false)?;
+
+    let sql = "select t1.t1_id, t1.t1_name, t1.t1_int from t1 where t1.t1_id + 
12 in (select t2.t2_id + 1 from t2)";
+
+    // assert logical plan
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan().unwrap();
+
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, 
t1_name:Utf8;N, t1_int:UInt32;N]",

Review Comment:
   👍  the plan looks good



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -94,12 +94,13 @@ where o_orderstatus in (
     let dataframe = ctx.sql(sql).await.unwrap();
     let plan = dataframe.into_optimized_plan().unwrap();
     let actual = format!("{}", plan.display_indent());
-    let expected = r#"Projection: orders.o_orderkey
-  LeftSemi Join: orders.o_orderstatus = __correlated_sq_1.l_linestatus, 
orders.o_orderkey = __correlated_sq_1.l_orderkey
-    TableScan: orders projection=[o_orderkey, o_orderstatus]
-    SubqueryAlias: __correlated_sq_1
-      Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey 
AS l_orderkey
-        TableScan: lineitem projection=[l_orderkey, l_linestatus]"#;
+

Review Comment:
   This change is just whitespace, right?



##########
datafusion/optimizer/src/decorrelate_where_in.rs:
##########
@@ -121,80 +121,95 @@ impl OptimizerRule for DecorrelateWhereIn {
     }
 }
 
+/// Optimize the where in subquery to left-anti/left-semi join.

Review Comment:
   ❤️ 



##########
datafusion/optimizer/src/decorrelate_where_in.rs:
##########
@@ -121,80 +121,95 @@ impl OptimizerRule for DecorrelateWhereIn {
     }
 }
 
+/// Optimize the where in subquery to left-anti/left-semi join.
+/// If the subquery is a correlated subquery, we need extract the join 
predicate from the subquery.
+///
+/// For example, given a query like:
+/// `select t1.a, t1.b from t1 where t1 in (select t2.a from t2 where t1.b = 
t2.b and t1.c > t2.c)`
+///
+/// The optimized plan will be:
+///
+/// Projection: t1.a, t1.b                                                     
                                                                                
                                                                                
                                                                  |
+///   LeftSemi Join:  Filter: t1.a = __correlated_sq_1.a AND t1.b = 
__correlated_sq_1.b AND t1.c > __correlated_sq_1.c                              
                                                                                
                                                                             |
+///     TableScan: t1                                                          
                                                                                
                                                                                
                                                                  |
+///     SubqueryAlias: __correlated_sq_1                                       
                                                                                
                                                                                
                                                                  |
+///       Projection: t2.a AS a, t2.b, t2.c                                    
                                                                                
                                                                                
                                                                  |
+///         TableScan: t2                                                      
                                                                                
                                                                                
                                                                  |
+///

Review Comment:
   ```suggestion
   /// The optimized plan will be:
   ///
   /// ```text
   /// Projection: t1.a, t1.b
   ///   LeftSemi Join:  Filter: t1.a = __correlated_sq_1.a AND t1.b = 
__correlated_sq_1.b AND t1.c > __correlated_sq_1.c
   ///     TableScan: t1
   ///     SubqueryAlias: __correlated_sq_1
   ///       Projection: t2.a AS a, t2.b, t2.c
   ///         TableScan: t2
   /// ```
   ```



##########
datafusion/optimizer/src/decorrelate_where_in.rs:
##########
@@ -121,80 +121,95 @@ impl OptimizerRule for DecorrelateWhereIn {
     }
 }
 
+/// Optimize the where in subquery to left-anti/left-semi join.
+/// If the subquery is a correlated subquery, we need extract the join 
predicate from the subquery.
+///
+/// For example, given a query like:
+/// `select t1.a, t1.b from t1 where t1 in (select t2.a from t2 where t1.b = 
t2.b and t1.c > t2.c)`
+///
+/// The optimized plan will be:
+///
+/// Projection: t1.a, t1.b                                                     
                                                                                
                                                                                
                                                                  |
+///   LeftSemi Join:  Filter: t1.a = __correlated_sq_1.a AND t1.b = 
__correlated_sq_1.b AND t1.c > __correlated_sq_1.c                              
                                                                                
                                                                             |
+///     TableScan: t1                                                          
                                                                                
                                                                                
                                                                  |
+///     SubqueryAlias: __correlated_sq_1                                       
                                                                                
                                                                                
                                                                  |
+///       Projection: t2.a AS a, t2.b, t2.c                                    
                                                                                
                                                                                
                                                                  |
+///         TableScan: t2                                                      
                                                                                
                                                                                
                                                                  |
+///
 fn optimize_where_in(
     query_info: &SubqueryInfo,
-    outer_input: &LogicalPlan,
+    left: &LogicalPlan,
     outer_other_exprs: &[Expr],
     alias: &AliasGenerator,
 ) -> Result<LogicalPlan> {
-    let proj = Projection::try_from_plan(&query_info.query.subquery)
+    let projection = Projection::try_from_plan(&query_info.query.subquery)
         .map_err(|e| context!("a projection is required", e))?;
-    let mut subqry_input = proj.input.clone();
-    let proj = only_or_err(proj.expr.as_slice())
+    let subquery_input = projection.input.clone();
+    let subquery_expr = only_or_err(projection.expr.as_slice())
         .map_err(|e| context!("single expression projection required", e))?;
-    let subquery_col = proj
-        .try_into_col()
-        .map_err(|e| context!("single column projection required", e))?;
-    let outer_col = query_info
-        .where_in_expr
-        .try_into_col()
-        .map_err(|e| context!("column comparison required", e))?;
-
-    // If subquery is correlated, grab necessary information
-    let mut subqry_cols = vec![];
-    let mut outer_cols = vec![];
-    let mut join_filters = None;
-    let mut other_subqry_exprs = vec![];
-    if let LogicalPlan::Filter(subqry_filter) = (*subqry_input).clone() {
-        // split into filters
-        let subqry_filter_exprs = split_conjunction(&subqry_filter.predicate);
-        verify_not_disjunction(&subqry_filter_exprs)?;
-
-        // Grab column names to join on
-        let (col_exprs, other_exprs) =
-            find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema())
-                .map_err(|e| context!("column correlation not found", e))?;
-        if !col_exprs.is_empty() {
-            // it's correlated
-            subqry_input = subqry_filter.input.clone();
-            (outer_cols, subqry_cols, join_filters) =
-                exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), 
false)
-                    .map_err(|e| context!("column correlation not found", e))?;
-            other_subqry_exprs = other_exprs;
-        }
-    }
 
-    let (subqry_cols, outer_cols) =
-        merge_cols((&[subquery_col], &subqry_cols), (&[outer_col], 
&outer_cols));
-
-    // build subquery side of join - the thing the subquery was querying
-    let subqry_alias = alias.next("__correlated_sq");
-    let mut subqry_plan = LogicalPlanBuilder::from((*subqry_input).clone());
-    if let Some(expr) = conjunction(other_subqry_exprs) {
-        // if the subquery had additional expressions, restore them
-        subqry_plan = subqry_plan.filter(expr)?
+    // extract join filters
+    let (join_filters, subquery_input) = 
extract_join_filters(subquery_input.as_ref())?;
+
+    // in_predicate may be also include in the join filters, remove it from 
the join filters.
+    let in_predicate = Expr::eq(query_info.where_in_expr.clone(), 
subquery_expr.clone());
+    let join_filters = remove_duplicated_filter(join_filters, in_predicate);
+
+    // replace qualified name with subquery alias.
+    let subquery_alias = alias.next("__correlated_sq");
+    let input_schema = subquery_input.schema();
+    let mut subquery_cols: BTreeSet<Column> =
+        join_filters
+            .iter()
+            .try_fold(BTreeSet::<Column>::new(), |mut cols, expr| {

Review Comment:
   ```suggestion
               .try_fold(BTreeSet::new(), |mut cols, expr| {
   ```
   



##########
datafusion/optimizer/src/decorrelate_where_in.rs:
##########
@@ -205,6 +220,72 @@ fn optimize_where_in(
     Ok(new_plan)
 }
 
+fn extract_join_filters(maybe_filter: &LogicalPlan) -> Result<(Vec<Expr>, 
LogicalPlan)> {
+    if let LogicalPlan::Filter(plan_filter) = maybe_filter {
+        let input_schema = plan_filter.input.schema();
+        let subquery_filter_exprs = split_conjunction(&plan_filter.predicate);
+
+        let mut join_filters: Vec<Expr> = vec![];
+        let mut subquery_filters: Vec<Expr> = vec![];
+        for expr in subquery_filter_exprs {
+            let cols = expr.to_columns()?;
+            if check_all_column_from_schema(&cols, input_schema.clone()) {
+                subquery_filters.push(expr.clone());
+            } else {
+                join_filters.push(expr.clone())
+            }
+        }
+
+        // if the subquery still has filter expressions, restore them.
+        let mut plan = LogicalPlanBuilder::from((*plan_filter.input).clone());
+        if let Some(expr) = conjunction(subquery_filters) {
+            plan = plan.filter(expr)?
+        }
+
+        Ok((join_filters, plan.build()?))
+    } else {
+        Ok((vec![], maybe_filter.clone()))
+    }
+}
+
+fn remove_duplicated_filter(filters: Vec<Expr>, in_predicate: Expr) -> 
Vec<Expr> {
+    filters
+        .into_iter()
+        .filter(|filter| {
+            if filter == &in_predicate {
+                return false;
+            }
+
+            // ignore the binary order
+            !match (filter, &in_predicate) {
+                (Expr::BinaryExpr(a_expr), Expr::BinaryExpr(b_expr)) => {
+                    (a_expr.op == b_expr.op)
+                        && (a_expr.left == b_expr.left && a_expr.right == 
b_expr.right)
+                        || (a_expr.left == b_expr.right && a_expr.right == 
b_expr.left)
+                }
+                _ => false,
+            }
+        })
+        .collect::<Vec<_>>()
+}
+
+fn replace_qualify_name(

Review Comment:
   ```suggestion
   fn replace_qualified_name(
   ```



##########
datafusion/optimizer/src/decorrelate_where_in.rs:
##########
@@ -554,14 +636,12 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        // Query will fail, but we can still transform the plan
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey 
[c_custkey:Int64, c_name:Utf8]\
+        \n  LeftSemi Join:  Filter: customer.c_custkey = 
__correlated_sq_1.o_custkey AND customer.c_custkey = customer.c_custkey 
[c_custkey:Int64, c_name:Utf8]\

Review Comment:
   This looks like somewhat of a regression to me (the filter has been pulled 
out of the subquery and into the join). Is that intended?



##########
datafusion/optimizer/src/decorrelate_where_in.rs:
##########
@@ -121,80 +121,95 @@ impl OptimizerRule for DecorrelateWhereIn {
     }
 }
 
+/// Optimize the where in subquery to left-anti/left-semi join.
+/// If the subquery is a correlated subquery, we need extract the join 
predicate from the subquery.
+///
+/// For example, given a query like:
+/// `select t1.a, t1.b from t1 where t1 in (select t2.a from t2 where t1.b = 
t2.b and t1.c > t2.c)`
+///
+/// The optimized plan will be:
+///
+/// Projection: t1.a, t1.b                                                     
                                                                                
                                                                                
                                                                  |
+///   LeftSemi Join:  Filter: t1.a = __correlated_sq_1.a AND t1.b = 
__correlated_sq_1.b AND t1.c > __correlated_sq_1.c                              
                                                                                
                                                                             |
+///     TableScan: t1                                                          
                                                                                
                                                                                
                                                                  |
+///     SubqueryAlias: __correlated_sq_1                                       
                                                                                
                                                                                
                                                                  |
+///       Projection: t2.a AS a, t2.b, t2.c                                    
                                                                                
                                                                                
                                                                  |
+///         TableScan: t2                                                      
                                                                                
                                                                                
                                                                  |
+///
 fn optimize_where_in(
     query_info: &SubqueryInfo,
-    outer_input: &LogicalPlan,
+    left: &LogicalPlan,
     outer_other_exprs: &[Expr],
     alias: &AliasGenerator,
 ) -> Result<LogicalPlan> {
-    let proj = Projection::try_from_plan(&query_info.query.subquery)
+    let projection = Projection::try_from_plan(&query_info.query.subquery)
         .map_err(|e| context!("a projection is required", e))?;
-    let mut subqry_input = proj.input.clone();
-    let proj = only_or_err(proj.expr.as_slice())
+    let subquery_input = projection.input.clone();
+    let subquery_expr = only_or_err(projection.expr.as_slice())
         .map_err(|e| context!("single expression projection required", e))?;
-    let subquery_col = proj
-        .try_into_col()
-        .map_err(|e| context!("single column projection required", e))?;
-    let outer_col = query_info
-        .where_in_expr
-        .try_into_col()
-        .map_err(|e| context!("column comparison required", e))?;
-
-    // If subquery is correlated, grab necessary information
-    let mut subqry_cols = vec![];
-    let mut outer_cols = vec![];
-    let mut join_filters = None;
-    let mut other_subqry_exprs = vec![];
-    if let LogicalPlan::Filter(subqry_filter) = (*subqry_input).clone() {
-        // split into filters
-        let subqry_filter_exprs = split_conjunction(&subqry_filter.predicate);
-        verify_not_disjunction(&subqry_filter_exprs)?;
-
-        // Grab column names to join on
-        let (col_exprs, other_exprs) =
-            find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema())
-                .map_err(|e| context!("column correlation not found", e))?;
-        if !col_exprs.is_empty() {
-            // it's correlated
-            subqry_input = subqry_filter.input.clone();
-            (outer_cols, subqry_cols, join_filters) =
-                exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), 
false)
-                    .map_err(|e| context!("column correlation not found", e))?;
-            other_subqry_exprs = other_exprs;
-        }
-    }
 
-    let (subqry_cols, outer_cols) =
-        merge_cols((&[subquery_col], &subqry_cols), (&[outer_col], 
&outer_cols));
-
-    // build subquery side of join - the thing the subquery was querying
-    let subqry_alias = alias.next("__correlated_sq");
-    let mut subqry_plan = LogicalPlanBuilder::from((*subqry_input).clone());
-    if let Some(expr) = conjunction(other_subqry_exprs) {
-        // if the subquery had additional expressions, restore them
-        subqry_plan = subqry_plan.filter(expr)?
+    // extract join filters
+    let (join_filters, subquery_input) = 
extract_join_filters(subquery_input.as_ref())?;
+
+    // in_predicate may be also include in the join filters, remove it from 
the join filters.
+    let in_predicate = Expr::eq(query_info.where_in_expr.clone(), 
subquery_expr.clone());
+    let join_filters = remove_duplicated_filter(join_filters, in_predicate);
+
+    // replace qualified name with subquery alias.
+    let subquery_alias = alias.next("__correlated_sq");
+    let input_schema = subquery_input.schema();
+    let mut subquery_cols: BTreeSet<Column> =
+        join_filters
+            .iter()
+            .try_fold(BTreeSet::<Column>::new(), |mut cols, expr| {
+                let using_cols: Vec<Column> = expr
+                    .to_columns()?
+                    .into_iter()
+                    .filter(|col| input_schema.field_from_column(col).is_ok())

Review Comment:
   doesn't this ignore columns that are not found in the subquery? I wonder if 
the error should be percolated up rather than ignored? Perhaps test coverage of 
a query like `select ... from t where x in (select non_existent_col from foo)` 
would be good?
   
   Although perhaps if it was an error it signals something is wrong in the 
plan (as a column reference wouldn't be present in the input schema 🤔 )
   
   If this shouldn't happen, maybe we can collect the errors and return them?



##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2810,3 +2810,137 @@ async fn type_coercion_join_with_filter_and_equi_expr() 
-> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn subquery_to_join_with_both_side_expr() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", false)?;
+
+    let sql = "select t1.t1_id, t1.t1_name, t1.t1_int from t1 where t1.t1_id + 
12 in (select t2.t2_id + 1 from t2)";
+
+    // assert logical plan
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan().unwrap();
+
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, 
t1_name:Utf8;N, t1_int:UInt32;N]",
+        "    LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = 
__correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) [t1_id:UInt32;N, 
t1_name:Utf8;N, t1_int:UInt32;N]",
+        "      TableScan: t1 projection=[t1_id, t1_name, t1_int] 
[t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "      SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + 
Int64(1):Int64;N]",
+        "        Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id 
AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
+        "          TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
+    ];
+
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    let expected = vec![
+        "+-------+---------+--------+",
+        "| t1_id | t1_name | t1_int |",
+        "+-------+---------+--------+",
+        "| 11    | a       | 1      |",
+        "| 33    | c       | 3      |",
+        "| 44    | d       | 4      |",
+        "+-------+---------+--------+",
+    ];
+
+    let results = execute_to_batches(&ctx, sql).await;
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn subquery_to_join_with_muti_filter() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", false)?;
+
+    let sql = "select t1.t1_id, t1.t1_name, t1.t1_int from t1 where t1.t1_id + 
12 in 
+                         (select t2.t2_id + 1 from t2 where t1.t1_int <= 
t2.t2_int and t2.t2_int > 0)";
+
+    // assert logical plan
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan().unwrap();
+
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, 
t1_name:Utf8;N, t1_int:UInt32;N]",
+        "    LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = 
__correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) Filter: t1.t1_int <= 
__correlated_sq_1.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "      TableScan: t1 projection=[t1_id, t1_name, t1_int] 
[t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "      SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + 
Int64(1):Int64;N, t2_int:UInt32;N]",
+        "        Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id 
AS Int64) + Int64(1), t2.t2_int [CAST(t2_id AS Int64) + Int64(1):Int64;N, 
t2_int:UInt32;N]",
+        "          Filter: t2.t2_int > UInt32(0) [t2_id:UInt32;N, 
t2_int:UInt32;N]",
+        "            TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, 
t2_int:UInt32;N]",
+    ];
+
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    let expected = vec![
+        "+-------+---------+--------+",
+        "| t1_id | t1_name | t1_int |",
+        "+-------+---------+--------+",
+        "| 11    | a       | 1      |",
+        "| 33    | c       | 3      |",
+        "+-------+---------+--------+",
+    ];
+
+    let results = execute_to_batches(&ctx, sql).await;
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn three_projection_exprs_subquery_to_join() -> Result<()> {

Review Comment:
   As a minor point, I am not sure what the extra coverage the multiple 
predicates in the where clause are adding
   
   I wonder if we need all these tests?



##########
datafusion/optimizer/src/decorrelate_where_in.rs:
##########
@@ -263,16 +344,15 @@ mod tests {
             .build()?;
 
         let expected = "Projection: test.b [b:UInt32]\
-        \n  LeftSemi Join: test.b = __correlated_sq_2.c [a:UInt32, b:UInt32, 
c:UInt32]\
-        \n    LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, 
c:UInt32]\
+        \n  LeftSemi Join:  Filter: test.b = __correlated_sq_2.c [a:UInt32, 
b:UInt32, c:UInt32]\

Review Comment:
   I believe these changes mean that the plans expressions are no no longer in 
the `ON` (equijoin) clause but instead are in the 
   
   
   However I think this is fine as other optimizer passes will move such 
predicates into the on clause. I tested this locally with this branch 👍 
   
   ```sql
   ❯ explain select *  from '/tmp/data.csv' t where t.name IN (select name from 
'/tmp/data.csv' where name = 'Sales');
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                             |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Projection: t.name, t.created_at, t.last_report            
                                                                             |
   |               |   LeftSemi Join: t.name = __correlated_sq_2.name           
                                                                             |
   |               |     SubqueryAlias: t                                       
                                                                             |
   |               |       TableScan: /tmp/data.csv projection=[name, 
created_at, last_report]                                                        
       |
   |               |     SubqueryAlias: __correlated_sq_2                       
                                                                             |
   |               |       Projection: /tmp/data.csv.name AS name               
                                                                             |
   |               |         Filter: /tmp/data.csv.name = Utf8("Sales")         
                                                                             |
   |               |           TableScan: /tmp/data.csv projection=[name], 
partial_filters=[/tmp/data.csv.name = Utf8("Sales")]          
   ```



##########
datafusion/optimizer/src/decorrelate_where_in.rs:
##########
@@ -205,6 +220,72 @@ fn optimize_where_in(
     Ok(new_plan)
 }
 
+fn extract_join_filters(maybe_filter: &LogicalPlan) -> Result<(Vec<Expr>, 
LogicalPlan)> {
+    if let LogicalPlan::Filter(plan_filter) = maybe_filter {
+        let input_schema = plan_filter.input.schema();
+        let subquery_filter_exprs = split_conjunction(&plan_filter.predicate);
+
+        let mut join_filters: Vec<Expr> = vec![];
+        let mut subquery_filters: Vec<Expr> = vec![];
+        for expr in subquery_filter_exprs {
+            let cols = expr.to_columns()?;
+            if check_all_column_from_schema(&cols, input_schema.clone()) {
+                subquery_filters.push(expr.clone());
+            } else {
+                join_filters.push(expr.clone())
+            }

Review Comment:
   Yeah -- this is the decorrelation logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to