alamb commented on code in PR #5264:
URL: https://github.com/apache/arrow-datafusion/pull/5264#discussion_r1109915829


##########
datafusion/optimizer/src/decorrelate_where_exists.rs:
##########
@@ -144,55 +142,68 @@ fn optimize_exists(
     query_info: &SubqueryInfo,
     outer_input: &LogicalPlan,
 ) -> Result<Option<LogicalPlan>> {
-    let subqry_filter = match query_info.query.subquery.as_ref() {
+    let maybe_subqury_filter = match query_info.query.subquery.as_ref() {
         LogicalPlan::Distinct(subqry_distinct) => match 
subqry_distinct.input.as_ref() {
-            LogicalPlan::Projection(subqry_proj) => {
-                Filter::try_from_plan(&subqry_proj.input)
-            }
+            LogicalPlan::Projection(subqry_proj) => &subqry_proj.input,
             _ => {
-                // Subquery currently only supports distinct or projection
                 return Ok(None);
             }
         },
-        LogicalPlan::Projection(subqry_proj) => 
Filter::try_from_plan(&subqry_proj.input),
+        LogicalPlan::Projection(subqry_proj) => &subqry_proj.input,
         _ => {
             // Subquery currently only supports distinct or projection
             return Ok(None);
         }
     }
-    .map_err(|e| context!("cannot optimize non-correlated subquery", e))?;
-
-    // split into filters
-    let subqry_filter_exprs = split_conjunction(&subqry_filter.predicate);
-    verify_not_disjunction(&subqry_filter_exprs)?;
-
-    // Grab column names to join on
-    let (col_exprs, other_subqry_exprs) =
-        find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema())?;
-    let (outer_cols, subqry_cols, join_filters) =
-        exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), false)?;
-    if subqry_cols.is_empty() || outer_cols.is_empty() {
-        // cannot optimize non-correlated subquery
+    .as_ref();
+
+    // extract join filters
+    let (join_filters, subquery_input) = 
extract_join_filters(maybe_subqury_filter)?;
+    // cannot optimize non-correlated subquery
+    if join_filters.is_empty() {
         return Ok(None);
     }
 
-    // build subquery side of join - the thing the subquery was querying
-    let mut subqry_plan = 
LogicalPlanBuilder::from(subqry_filter.input.as_ref().clone());
-    if let Some(expr) = conjunction(other_subqry_exprs) {
-        subqry_plan = subqry_plan.filter(expr)? // if the subquery had 
additional expressions, restore them
-    }
-    let subqry_plan = subqry_plan.build()?;
+    let input_schema = subquery_input.schema();
+    let subquery_cols: BTreeSet<Column> =
+        join_filters
+            .iter()
+            .try_fold(BTreeSet::new(), |mut cols, expr| {
+                let using_cols: Vec<Column> = expr
+                    .to_columns()?
+                    .into_iter()
+                    .filter(|col| input_schema.field_from_column(col).is_ok())
+                    .collect::<_>();
+
+                cols.extend(using_cols);
+                Result::<_, DataFusionError>::Ok(cols)
+            })?;
+
+    let projection_exprs: Vec<Expr> =
+        subquery_cols.into_iter().map(Expr::Column).collect();
+
+    let right = LogicalPlanBuilder::from(subquery_input)
+        .project(projection_exprs)?
+        .build()?;
 
-    let join_keys = (subqry_cols, outer_cols);
+    let join_filter = conjunction(join_filters);
 
     // join our sub query into the main plan
     let join_type = match query_info.negated {
         true => JoinType::LeftAnti,
         false => JoinType::LeftSemi,
     };
+
+    // TODO: add Distinct if the original plan is a Distinct.

Review Comment:
   > I will do this in the following pr if it is ok 🤣.
   
   Absolutely it is ok! Thank you for all your work



##########
datafusion/optimizer/src/decorrelate_where_exists.rs:
##########
@@ -144,55 +142,68 @@ fn optimize_exists(
     query_info: &SubqueryInfo,
     outer_input: &LogicalPlan,
 ) -> Result<Option<LogicalPlan>> {
-    let subqry_filter = match query_info.query.subquery.as_ref() {
+    let maybe_subqury_filter = match query_info.query.subquery.as_ref() {
         LogicalPlan::Distinct(subqry_distinct) => match 
subqry_distinct.input.as_ref() {
-            LogicalPlan::Projection(subqry_proj) => {
-                Filter::try_from_plan(&subqry_proj.input)
-            }
+            LogicalPlan::Projection(subqry_proj) => &subqry_proj.input,
             _ => {
-                // Subquery currently only supports distinct or projection
                 return Ok(None);
             }
         },
-        LogicalPlan::Projection(subqry_proj) => 
Filter::try_from_plan(&subqry_proj.input),
+        LogicalPlan::Projection(subqry_proj) => &subqry_proj.input,
         _ => {
             // Subquery currently only supports distinct or projection
             return Ok(None);
         }
     }
-    .map_err(|e| context!("cannot optimize non-correlated subquery", e))?;
-
-    // split into filters
-    let subqry_filter_exprs = split_conjunction(&subqry_filter.predicate);
-    verify_not_disjunction(&subqry_filter_exprs)?;
-
-    // Grab column names to join on
-    let (col_exprs, other_subqry_exprs) =
-        find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema())?;
-    let (outer_cols, subqry_cols, join_filters) =
-        exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), false)?;
-    if subqry_cols.is_empty() || outer_cols.is_empty() {
-        // cannot optimize non-correlated subquery
+    .as_ref();
+
+    // extract join filters
+    let (join_filters, subquery_input) = 
extract_join_filters(maybe_subqury_filter)?;
+    // cannot optimize non-correlated subquery
+    if join_filters.is_empty() {
         return Ok(None);
     }
 
-    // build subquery side of join - the thing the subquery was querying
-    let mut subqry_plan = 
LogicalPlanBuilder::from(subqry_filter.input.as_ref().clone());
-    if let Some(expr) = conjunction(other_subqry_exprs) {
-        subqry_plan = subqry_plan.filter(expr)? // if the subquery had 
additional expressions, restore them
-    }
-    let subqry_plan = subqry_plan.build()?;
+    let input_schema = subquery_input.schema();
+    let subquery_cols: BTreeSet<Column> =
+        join_filters
+            .iter()
+            .try_fold(BTreeSet::new(), |mut cols, expr| {
+                let using_cols: Vec<Column> = expr
+                    .to_columns()?
+                    .into_iter()
+                    .filter(|col| input_schema.field_from_column(col).is_ok())
+                    .collect::<_>();
+
+                cols.extend(using_cols);
+                Result::<_, DataFusionError>::Ok(cols)
+            })?;
+
+    let projection_exprs: Vec<Expr> =
+        subquery_cols.into_iter().map(Expr::Column).collect();
+
+    let right = LogicalPlanBuilder::from(subquery_input)
+        .project(projection_exprs)?
+        .build()?;
 
-    let join_keys = (subqry_cols, outer_cols);
+    let join_filter = conjunction(join_filters);
 
     // join our sub query into the main plan
     let join_type = match query_info.negated {
         true => JoinType::LeftAnti,
         false => JoinType::LeftSemi,
     };
+
+    // TODO: add Distinct if the original plan is a Distinct.

Review Comment:
   > I will do this in the following pr if it is ok 🤣.
   
   Absolutely it is ok! Thank you for all your work on this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to