alamb commented on code in PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#discussion_r887818870


##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -224,32 +244,67 @@ fn optimize_join(
     plan: &LogicalPlan,
     left: &LogicalPlan,
     right: &LogicalPlan,
+    on_filter: Vec<(Expr, HashSet<Column>)>,
 ) -> Result<LogicalPlan> {
+    // Get pushable predicates from current optimizer state
     let (left_preserved, right_preserved) = lr_is_preserved(plan);
-    let to_left = get_pushable_join_predicates(&state, left.schema(), 
left_preserved);
-    let to_right = get_pushable_join_predicates(&state, right.schema(), 
right_preserved);
-
+    let to_left =
+        get_pushable_join_predicates(&state.filters, left.schema(), 
left_preserved);
+    let to_right =
+        get_pushable_join_predicates(&state.filters, right.schema(), 
right_preserved);
     let to_keep: Predicates = state
         .filters
         .iter()
-        .filter(|(expr, _)| {
-            let pushed_to_left = to_left.0.contains(&expr);
-            let pushed_to_right = to_right.0.contains(&expr);
-            !pushed_to_left && !pushed_to_right
-        })
+        .filter(|(e, _)| !to_left.0.contains(&e) && !to_right.0.contains(&e))
         .map(|(a, b)| (a, b))
         .unzip();
 
-    let mut left_state = state.clone();
-    left_state.filters = keep_filters(&left_state.filters, &to_left);
+    // Get pushable predicates from join filter
+    let (on_to_left, on_to_right, on_to_keep) = if on_filter.is_empty() {
+        ((vec![], vec![]), (vec![], vec![]), vec![])
+    } else {
+        let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(plan);
+        let on_to_left =
+            get_pushable_join_predicates(&on_filter, left.schema(), 
on_left_preserved);
+        let on_to_right =
+            get_pushable_join_predicates(&on_filter, right.schema(), 
on_right_preserved);
+        let on_to_keep = on_filter
+            .iter()
+            .filter(|(e, _)| !on_to_left.0.contains(&e) && 
!on_to_right.0.contains(&e))
+            .map(|(a, _)| a.clone())
+            .collect::<Vec<_>>();
+
+        (on_to_left, on_to_right, on_to_keep)
+    };
+
+    // Build new filter states using pushable predicates
+    // from current optimizer states and from ON clause.
+    // Then recursively call optimization for both join inputs
+    let mut left_state = State { filters: vec![] };
+    left_state.append_predicates(to_left);
+    left_state.append_predicates(on_to_left);
     let left = optimize(left, left_state)?;
 
-    let mut right_state = state.clone();
-    right_state.filters = keep_filters(&right_state.filters, &to_right);
+    let mut right_state = State { filters: vec![] };
+    right_state.append_predicates(to_right);
+    right_state.append_predicates(on_to_right);
     let right = optimize(right, right_state)?;
 
     // create a new Join with the new `left` and `right`
     let expr = plan.expressions();
+    let expr = if !on_filter.is_empty() && on_to_keep.is_empty() {

Review Comment:
   Thank you -- I think the comment helps a lot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to