vadimpiven commented on code in PR #18848:
URL: https://github.com/apache/datafusion/pull/18848#discussion_r2547743484


##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -418,6 +418,204 @@ fn extract_or_clause(expr: &Expr, schema_columns: 
&HashSet<Column>) -> Option<Ex
     predicate
 }
 
+/// Tracks coalesce predicates that can be pushed to each side of a FULL JOIN.
+struct PushDownCoalesceFilterHelper {
+    join_keys: Vec<(Column, Column)>,
+    left_filters: Vec<Expr>,
+    right_filters: Vec<Expr>,
+    remaining_filters: Vec<Expr>,
+}
+
+impl PushDownCoalesceFilterHelper {
+    fn new(join_keys: &[(Expr, Expr)]) -> Self {
+        let join_keys = join_keys
+            .iter()
+            .filter_map(|(lhs, rhs)| {
+                Some((lhs.try_as_col()?.clone(), rhs.try_as_col()?.clone()))
+            })
+            .collect();
+        Self {
+            join_keys,
+            left_filters: Vec::new(),
+            right_filters: Vec::new(),
+            remaining_filters: Vec::new(),
+        }
+    }
+
+    fn push_columns<F: FnMut(Expr) -> Expr>(
+        &mut self,
+        columns: (Column, Column),
+        mut build_filter: F,
+    ) {
+        self.left_filters
+            .push(build_filter(Expr::Column(columns.0)));
+        self.right_filters
+            .push(build_filter(Expr::Column(columns.1)));
+    }
+
+    fn extract_join_columns(&self, expr: &Expr) -> Option<(Column, Column)> {
+        if let Expr::ScalarFunction(ScalarFunction { func, args }) = expr {
+            if func.name() != "coalesce" {
+                return None;
+            }
+            if let [Expr::Column(lhs), Expr::Column(rhs)] = args.as_slice() {
+                for (join_lhs, join_rhs) in &self.join_keys {
+                    if join_lhs == lhs && join_rhs == rhs {
+                        return Some((lhs.clone(), rhs.clone()));
+                    }
+                    if join_lhs == rhs && join_rhs == lhs {
+                        return Some((rhs.clone(), lhs.clone()));
+                    }
+                }
+            }
+        }
+        None
+    }
+
+    fn push_term(&mut self, term: &Expr) {
+        match term {
+            Expr::BinaryExpr(BinaryExpr { left, op, right })
+                if op.supports_propagation() =>
+            {
+                if let Some(columns) = self.extract_join_columns(left) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::BinaryExpr(BinaryExpr {
+                            left: Box::new(replacement),
+                            op: *op,
+                            right: right.clone(),
+                        })
+                    });
+                }
+                if let Some(columns) = self.extract_join_columns(right) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::BinaryExpr(BinaryExpr {
+                            left: left.clone(),
+                            op: *op,
+                            right: Box::new(replacement),
+                        })
+                    });
+                }
+            }
+            Expr::IsNull(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsNull(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::IsNotNull(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsNotNull(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::IsTrue(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsTrue(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::IsFalse(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsFalse(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::IsUnknown(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsUnknown(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::IsNotTrue(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsNotTrue(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::IsNotFalse(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsNotFalse(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::IsNotUnknown(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsNotUnknown(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::Between(between) => {
+                if let Some(columns) = 
self.extract_join_columns(&between.expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::Between(Between {
+                            expr: Box::new(replacement),
+                            negated: between.negated,
+                            low: between.low.clone(),
+                            high: between.high.clone(),
+                        })
+                    });
+                }
+            }
+            Expr::InList(in_list) => {
+                if let Some(columns) = 
self.extract_join_columns(&in_list.expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::InList(InList {
+                            expr: Box::new(replacement),
+                            list: in_list.list.clone(),
+                            negated: in_list.negated,
+                        })
+                    });
+                }
+            }
+            _ => {}
+        }
+        self.remaining_filters.push(term.clone());
+    }
+
+    fn push_predicate(
+        mut self,
+        predicate: Expr,
+    ) -> Result<(Option<Expr>, Option<Expr>, Vec<Expr>)> {
+        let predicates = split_conjunction_owned(predicate);
+        let terms = simplify_predicates(predicates)?;
+        for term in terms {
+            self.push_term(&term);
+        }
+        Ok((
+            conjunction(self.left_filters),
+            conjunction(self.right_filters),
+            self.remaining_filters,
+        ))
+    }
+}
+
+fn push_full_join_coalesce_filters(

Review Comment:
   Let me know if you believe that nobody else has the scenario I described, 
this way we can simply close the PR and issue without further discussion)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to