korowa commented on code in PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#discussion_r887587666


##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -224,32 +244,67 @@ fn optimize_join(
     plan: &LogicalPlan,
     left: &LogicalPlan,
     right: &LogicalPlan,
+    on_filter: Vec<(Expr, HashSet<Column>)>,
 ) -> Result<LogicalPlan> {
+    // Get pushable predicates from current optimizer state
     let (left_preserved, right_preserved) = lr_is_preserved(plan);
-    let to_left = get_pushable_join_predicates(&state, left.schema(), 
left_preserved);
-    let to_right = get_pushable_join_predicates(&state, right.schema(), 
right_preserved);
-
+    let to_left =
+        get_pushable_join_predicates(&state.filters, left.schema(), 
left_preserved);
+    let to_right =
+        get_pushable_join_predicates(&state.filters, right.schema(), 
right_preserved);
     let to_keep: Predicates = state
         .filters
         .iter()
-        .filter(|(expr, _)| {
-            let pushed_to_left = to_left.0.contains(&expr);
-            let pushed_to_right = to_right.0.contains(&expr);
-            !pushed_to_left && !pushed_to_right
-        })
+        .filter(|(e, _)| !to_left.0.contains(&e) && !to_right.0.contains(&e))
         .map(|(a, b)| (a, b))
         .unzip();
 
-    let mut left_state = state.clone();
-    left_state.filters = keep_filters(&left_state.filters, &to_left);
+    // Get pushable predicates from join filter
+    let (on_to_left, on_to_right, on_to_keep) = if on_filter.is_empty() {
+        ((vec![], vec![]), (vec![], vec![]), vec![])
+    } else {
+        let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(plan);
+        let on_to_left =
+            get_pushable_join_predicates(&on_filter, left.schema(), 
on_left_preserved);
+        let on_to_right =
+            get_pushable_join_predicates(&on_filter, right.schema(), 
on_right_preserved);
+        let on_to_keep = on_filter
+            .iter()
+            .filter(|(e, _)| !on_to_left.0.contains(&e) && 
!on_to_right.0.contains(&e))
+            .map(|(a, _)| a.clone())
+            .collect::<Vec<_>>();
+
+        (on_to_left, on_to_right, on_to_keep)
+    };
+
+    // Build new filter states using pushable predicates
+    // from current optimizer states and from ON clause.
+    // Then recursively call optimization for both join inputs
+    let mut left_state = State { filters: vec![] };
+    left_state.append_predicates(to_left);
+    left_state.append_predicates(on_to_left);
     let left = optimize(left, left_state)?;
 
-    let mut right_state = state.clone();
-    right_state.filters = keep_filters(&right_state.filters, &to_right);
+    let mut right_state = State { filters: vec![] };
+    right_state.append_predicates(to_right);
+    right_state.append_predicates(on_to_right);
     let right = optimize(right, right_state)?;
 
     // create a new Join with the new `left` and `right`
     let expr = plan.expressions();
+    let expr = if !on_filter.is_empty() && on_to_keep.is_empty() {

Review Comment:
   I guess it's the downside of approach when we represent complex (or complex 
enough) structure (or some parts of it) with simple list/dict/tuple/etc, 
however after looking at how `LogicalPlan::Repartition` and 
`LogicalPlan::Aggregate` translated into expressions I supposed that we can 
rely on expressions vector element order in this case.
   
   Regarding your comment above -- I've added explanation of what's going on 
with last element and what's expected `expressions()` behaviour -- hope it can 
be helpful for next person who access this part of code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to