jon-chuang commented on code in PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#discussion_r865692147


##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -46,6 +47,227 @@ impl SubqueryFilterToJoin {
     pub fn new() -> Self {
         Self {}
     }
+
+    fn are_correlated_columns(
+        &self,
+        outer: &Arc<DFSchema>,
+        column_a: &Column,
+        column_b: &Column,
+    ) -> Option<(Column, Column)> {
+        if column_is_correlated(outer, column_a) {
+            return Some((column_a.clone(), column_b.clone()));
+        } else if column_is_correlated(outer, column_b) {
+            return Some((column_b.clone(), column_a.clone()));
+        }
+        None
+    }
+
+    // TODO: do we need to check correlation/dependency only with outer input 
top-level schema?
+    // NOTE: We only match against an equality filter with an outer column
+    fn extract_correlated_columns(
+        &self,
+        expr: &Expr,
+        outer: &Arc<DFSchema>,
+        correlated_columns: &mut Vec<(Column, Column)>,
+    ) -> Option<Expr> {
+        let mut filters = vec![];
+        // This will also strip aliases
+        utils::split_conjunction(expr, &mut filters);
+
+        let mut non_correlated_predicates = vec![];
+        for filter in filters {
+            match filter {
+                Expr::BinaryExpr { left, op, right } => {
+                    let mut extracted_column = false;
+                    if let (Expr::Column(column_a), Expr::Column(column_b)) =
+                        (left.as_ref(), right.as_ref())
+                    {
+                        if let Some(columns) =
+                            self.are_correlated_columns(outer, &column_a, 
&column_b)
+                        {
+                            if *op == Operator::Eq {
+                                correlated_columns.push(columns);
+                                extracted_column = true;
+                            }
+                        }
+                    }
+                    if !extracted_column {
+                        non_correlated_predicates.push(filter);
+                    }
+                }
+                _ => non_correlated_predicates.push(filter),
+            }
+        }
+
+        if non_correlated_predicates.is_empty() {
+            None
+        } else {
+            Some(utils::combine_conjunctive(&non_correlated_predicates))
+        }
+    }
+
+    fn rewrite_outer_plan(
+        &self,
+        outer_plan: LogicalPlan,
+        expr: &Expr,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
+        match expr {
+            Expr::InSubquery {
+                expr,
+                subquery,
+                negated,
+            } => {
+                let mut correlated_columns = vec![];
+                let subquery_ref = &*subquery.subquery;
+                let right_decorrelated_plan = match subquery_ref {
+                    // NOTE: We only pattern match against 
Projection(Filter(..)). We will have another optimization rule
+                    // which tries to pull up all correlated predicates in an 
InSubquery into a Projection(Filter(..))
+                    // at the root node of the InSubquery's subquery. The 
Projection at the root must have as its expression
+                    // a single Column.

Review Comment:
   This may be somewhat restrictive. It can be expanded but it needs some work. 
In particular joins will need to accept expressions rather than just joining 
blindly on rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to