alamb commented on code in PR #18848:
URL: https://github.com/apache/datafusion/pull/18848#discussion_r2547340274
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -418,6 +418,204 @@ fn extract_or_clause(expr: &Expr, schema_columns:
&HashSet<Column>) -> Option<Ex
predicate
}
+/// Tracks coalesce predicates that can be pushed to each side of a FULL JOIN.
+struct PushDownCoalesceFilterHelper {
+ join_keys: Vec<(Column, Column)>,
+ left_filters: Vec<Expr>,
+ right_filters: Vec<Expr>,
+ remaining_filters: Vec<Expr>,
+}
+
+impl PushDownCoalesceFilterHelper {
+ fn new(join_keys: &[(Expr, Expr)]) -> Self {
+ let join_keys = join_keys
+ .iter()
+ .filter_map(|(lhs, rhs)| {
+ Some((lhs.try_as_col()?.clone(), rhs.try_as_col()?.clone()))
+ })
+ .collect();
+ Self {
+ join_keys,
+ left_filters: Vec::new(),
+ right_filters: Vec::new(),
+ remaining_filters: Vec::new(),
+ }
+ }
+
+ fn push_columns<F: FnMut(Expr) -> Expr>(
+ &mut self,
+ columns: (Column, Column),
+ mut build_filter: F,
+ ) {
+ self.left_filters
+ .push(build_filter(Expr::Column(columns.0)));
+ self.right_filters
+ .push(build_filter(Expr::Column(columns.1)));
+ }
+
+ fn extract_join_columns(&self, expr: &Expr) -> Option<(Column, Column)> {
+ if let Expr::ScalarFunction(ScalarFunction { func, args }) = expr {
+ if func.name() != "coalesce" {
+ return None;
+ }
+ if let [Expr::Column(lhs), Expr::Column(rhs)] = args.as_slice() {
+ for (join_lhs, join_rhs) in &self.join_keys {
+ if join_lhs == lhs && join_rhs == rhs {
+ return Some((lhs.clone(), rhs.clone()));
+ }
+ if join_lhs == rhs && join_rhs == lhs {
+ return Some((rhs.clone(), lhs.clone()));
+ }
+ }
+ }
+ }
+ None
+ }
+
+ fn push_term(&mut self, term: &Expr) {
+ match term {
+ Expr::BinaryExpr(BinaryExpr { left, op, right })
+ if op.supports_propagation() =>
+ {
+ if let Some(columns) = self.extract_join_columns(left) {
+ return self.push_columns(columns, |replacement| {
+ Expr::BinaryExpr(BinaryExpr {
+ left: Box::new(replacement),
+ op: *op,
+ right: right.clone(),
+ })
+ });
+ }
+ if let Some(columns) = self.extract_join_columns(right) {
+ return self.push_columns(columns, |replacement| {
+ Expr::BinaryExpr(BinaryExpr {
+ left: left.clone(),
+ op: *op,
+ right: Box::new(replacement),
+ })
+ });
+ }
+ }
+ Expr::IsNull(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsNull(Box::new(replacement))
+ });
+ }
+ }
+ Expr::IsNotNull(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsNotNull(Box::new(replacement))
+ });
+ }
+ }
+ Expr::IsTrue(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsTrue(Box::new(replacement))
+ });
+ }
+ }
+ Expr::IsFalse(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsFalse(Box::new(replacement))
+ });
+ }
+ }
+ Expr::IsUnknown(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsUnknown(Box::new(replacement))
+ });
+ }
+ }
+ Expr::IsNotTrue(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsNotTrue(Box::new(replacement))
+ });
+ }
+ }
+ Expr::IsNotFalse(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsNotFalse(Box::new(replacement))
+ });
+ }
+ }
+ Expr::IsNotUnknown(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsNotUnknown(Box::new(replacement))
+ });
+ }
+ }
+ Expr::Between(between) => {
+ if let Some(columns) =
self.extract_join_columns(&between.expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::Between(Between {
+ expr: Box::new(replacement),
+ negated: between.negated,
+ low: between.low.clone(),
+ high: between.high.clone(),
+ })
+ });
+ }
+ }
+ Expr::InList(in_list) => {
+ if let Some(columns) =
self.extract_join_columns(&in_list.expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::InList(InList {
+ expr: Box::new(replacement),
+ list: in_list.list.clone(),
+ negated: in_list.negated,
+ })
+ });
+ }
+ }
+ _ => {}
+ }
+ self.remaining_filters.push(term.clone());
+ }
+
+ fn push_predicate(
+ mut self,
+ predicate: Expr,
+ ) -> Result<(Option<Expr>, Option<Expr>, Vec<Expr>)> {
+ let predicates = split_conjunction_owned(predicate);
+ let terms = simplify_predicates(predicates)?;
+ for term in terms {
+ self.push_term(&term);
+ }
+ Ok((
+ conjunction(self.left_filters),
+ conjunction(self.right_filters),
+ self.remaining_filters,
+ ))
+ }
+}
+
+fn push_full_join_coalesce_filters(
Review Comment:
to push filters into the inputs of a `FULL JOIN` , you need to guarantee
that the join doens't reintroduce rows (with nulls) that would have been
filtered if the filter was applied beforehand
In other words, it is not clear to me that this optimization is correct
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]