ygf11 commented on code in PR #4826:
URL: https://github.com/apache/arrow-datafusion/pull/4826#discussion_r1069150341


##########
datafusion/optimizer/src/decorrelate_where_in.rs:
##########
@@ -121,80 +121,95 @@ impl OptimizerRule for DecorrelateWhereIn {
     }
 }
 
+/// Optimize the where in subquery to left-anti/left-semi join.
+/// If the subquery is a correlated subquery, we need extract the join 
predicate from the subquery.
+///
+/// For example, given a query like:
+/// `select t1.a, t1.b from t1 where t1 in (select t2.a from t2 where t1.b = 
t2.b and t1.c > t2.c)`
+///
+/// The optimized plan will be:
+///
+/// Projection: t1.a, t1.b                                                     
                                                                                
                                                                                
                                                                  |
+///   LeftSemi Join:  Filter: t1.a = __correlated_sq_1.a AND t1.b = 
__correlated_sq_1.b AND t1.c > __correlated_sq_1.c                              
                                                                                
                                                                             |
+///     TableScan: t1                                                          
                                                                                
                                                                                
                                                                  |
+///     SubqueryAlias: __correlated_sq_1                                       
                                                                                
                                                                                
                                                                  |
+///       Projection: t2.a AS a, t2.b, t2.c                                    
                                                                                
                                                                                
                                                                  |
+///         TableScan: t2                                                      
                                                                                
                                                                                
                                                                  |
+///
 fn optimize_where_in(
     query_info: &SubqueryInfo,
-    outer_input: &LogicalPlan,
+    left: &LogicalPlan,
     outer_other_exprs: &[Expr],
     alias: &AliasGenerator,
 ) -> Result<LogicalPlan> {
-    let proj = Projection::try_from_plan(&query_info.query.subquery)
+    let projection = Projection::try_from_plan(&query_info.query.subquery)
         .map_err(|e| context!("a projection is required", e))?;
-    let mut subqry_input = proj.input.clone();
-    let proj = only_or_err(proj.expr.as_slice())
+    let subquery_input = projection.input.clone();
+    let subquery_expr = only_or_err(projection.expr.as_slice())
         .map_err(|e| context!("single expression projection required", e))?;
-    let subquery_col = proj
-        .try_into_col()
-        .map_err(|e| context!("single column projection required", e))?;
-    let outer_col = query_info
-        .where_in_expr
-        .try_into_col()
-        .map_err(|e| context!("column comparison required", e))?;
-
-    // If subquery is correlated, grab necessary information
-    let mut subqry_cols = vec![];
-    let mut outer_cols = vec![];
-    let mut join_filters = None;
-    let mut other_subqry_exprs = vec![];
-    if let LogicalPlan::Filter(subqry_filter) = (*subqry_input).clone() {
-        // split into filters
-        let subqry_filter_exprs = split_conjunction(&subqry_filter.predicate);
-        verify_not_disjunction(&subqry_filter_exprs)?;
-
-        // Grab column names to join on
-        let (col_exprs, other_exprs) =
-            find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema())
-                .map_err(|e| context!("column correlation not found", e))?;
-        if !col_exprs.is_empty() {
-            // it's correlated
-            subqry_input = subqry_filter.input.clone();
-            (outer_cols, subqry_cols, join_filters) =
-                exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), 
false)
-                    .map_err(|e| context!("column correlation not found", e))?;
-            other_subqry_exprs = other_exprs;
-        }
-    }
 
-    let (subqry_cols, outer_cols) =
-        merge_cols((&[subquery_col], &subqry_cols), (&[outer_col], 
&outer_cols));
-
-    // build subquery side of join - the thing the subquery was querying
-    let subqry_alias = alias.next("__correlated_sq");
-    let mut subqry_plan = LogicalPlanBuilder::from((*subqry_input).clone());
-    if let Some(expr) = conjunction(other_subqry_exprs) {
-        // if the subquery had additional expressions, restore them
-        subqry_plan = subqry_plan.filter(expr)?
+    // extract join filters
+    let (join_filters, subquery_input) = 
extract_join_filters(subquery_input.as_ref())?;
+
+    // in_predicate may be also include in the join filters, remove it from 
the join filters.
+    let in_predicate = Expr::eq(query_info.where_in_expr.clone(), 
subquery_expr.clone());
+    let join_filters = remove_duplicated_filter(join_filters, in_predicate);
+
+    // replace qualified name with subquery alias.
+    let subquery_alias = alias.next("__correlated_sq");
+    let input_schema = subquery_input.schema();
+    let mut subquery_cols: BTreeSet<Column> =
+        join_filters
+            .iter()
+            .try_fold(BTreeSet::<Column>::new(), |mut cols, expr| {
+                let using_cols: Vec<Column> = expr
+                    .to_columns()?
+                    .into_iter()
+                    .filter(|col| input_schema.field_from_column(col).is_ok())

Review Comment:
   > doesn't this ignore columns that are not found in the subquery
   
   Yes, it will ignore. But it is used to collect columns which will be add to 
subquery projection.
   
   Suppose there is a query:
   `select * from t where x in (select c from foo where foo.a > t.a and foo.b > 
t.a)`
   
   `foo.a > t.a and foo.b > t.a` will move to `join`, to make the join work, we 
need add `foo.a` and `foo.b` to the projection of subquery, the above code does 
the filter work.
    
   If the where clause references unknown columns, we need do more work here, 
but I think it should be done in the planner.
   
   
   
   
    
     



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to