ygf11 commented on code in PR #5345:
URL: https://github.com/apache/arrow-datafusion/pull/5345#discussion_r1124147102


##########
datafusion/optimizer/src/decorrelate_where_exists.rs:
##########
@@ -142,69 +144,70 @@ fn optimize_exists(
     query_info: &SubqueryInfo,
     outer_input: &LogicalPlan,
 ) -> Result<Option<LogicalPlan>> {
-    let maybe_subqury_filter = match query_info.query.subquery.as_ref() {
-        LogicalPlan::Distinct(subqry_distinct) => match 
subqry_distinct.input.as_ref() {
-            LogicalPlan::Projection(subqry_proj) => &subqry_proj.input,
-            _ => {
-                return Ok(None);
-            }
-        },
-        LogicalPlan::Projection(subqry_proj) => &subqry_proj.input,
-        _ => {
-            // Subquery currently only supports distinct or projection
-            return Ok(None);
-        }
-    }
-    .as_ref();
+    let subquery = query_info.query.subquery.as_ref();
+    if let Some((join_filter, optimized_subquery)) = 
optimize_subquery(subquery)? {
+        // join our sub query into the main plan
+        let join_type = match query_info.negated {
+            true => JoinType::LeftAnti,
+            false => JoinType::LeftSemi,
+        };
+
+        let new_plan = LogicalPlanBuilder::from(outer_input.clone())
+            .join(
+                optimized_subquery,
+                join_type,
+                (Vec::<Column>::new(), Vec::<Column>::new()),
+                Some(join_filter),
+            )?
+            .build()?;
 
-    // extract join filters
-    let (join_filters, subquery_input) = 
extract_join_filters(maybe_subqury_filter)?;
-    // cannot optimize non-correlated subquery
-    if join_filters.is_empty() {
-        return Ok(None);
+        Ok(Some(new_plan))
+    } else {
+        Ok(None)
     }
-
-    let input_schema = subquery_input.schema();
-    let subquery_cols: BTreeSet<Column> =
-        join_filters
-            .iter()
-            .try_fold(BTreeSet::new(), |mut cols, expr| {
-                let using_cols: Vec<Column> = expr
-                    .to_columns()?
+}
+/// Optimize the subquery and extract the possible join filter.
+/// This function can't optimize non-correlated subquery, and will return None.
+fn optimize_subquery(subquery: &LogicalPlan) -> Result<Option<(Expr, 
LogicalPlan)>> {
+    match subquery {
+        LogicalPlan::Distinct(subqry_distinct) => {
+            let distinct_input = &subqry_distinct.input;
+            let optimized_plan =
+                optimize_subquery(distinct_input)?.map(|(filters, right)| {
+                    (
+                        filters,
+                        LogicalPlan::Distinct(Distinct {
+                            input: Arc::new(right),
+                        }),
+                    )
+                });
+            Ok(optimized_plan)
+        }

Review Comment:
   I am not sure the behavior of `Distinct` is correct, so do not handle 
`Aggregate` here.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to