alamb commented on code in PR #10444: URL: https://github.com/apache/datafusion/pull/10444#discussion_r1598460493
########## datafusion/optimizer/src/push_down_filter.rs: ########## @@ -526,93 +496,102 @@ fn push_down_join( .as_ref() .map_or_else(Vec::new, |filter| split_conjunction_owned(filter.clone())); - let mut is_inner_join = false; - let infer_predicates = if join.join_type == JoinType::Inner { - is_inner_join = true; - - // Only allow both side key is column. - let join_col_keys = join - .on - .iter() - .filter_map(|(l, r)| { - let left_col = l.try_as_col().cloned()?; - let right_col = r.try_as_col().cloned()?; - Some((left_col, right_col)) - }) - .collect::<Vec<_>>(); - - // TODO refine the logic, introduce EquivalenceProperties to logical plan and infer additional filters to push down - // For inner joins, duplicate filters for joined columns so filters can be pushed down - // to both sides. Take the following query as an example: - // - // ```sql - // SELECT * FROM t1 JOIN t2 on t1.id = t2.uid WHERE t1.id > 1 - // ``` - // - // `t1.id > 1` predicate needs to be pushed down to t1 table scan, while - // `t2.uid > 1` predicate needs to be pushed down to t2 table scan. - // - // Join clauses with `Using` constraints also take advantage of this logic to make sure - // predicates reference the shared join columns are pushed to both sides. - // This logic should also been applied to conditions in JOIN ON clause - predicates - .iter() - .chain(on_filters.iter()) - .filter_map(|predicate| { - let mut join_cols_to_replace = HashMap::new(); - - let columns = match predicate.to_columns() { - Ok(columns) => columns, - Err(e) => return Some(Err(e)), - }; + // Are there any new join predicates that can be inferred from the filter expressions? + let inferred_join_predicates = + infer_join_predicates(&join, &predicates, &on_filters)?; - for col in columns.iter() { - for (l, r) in join_col_keys.iter() { - if col == l { - join_cols_to_replace.insert(col, r); - break; - } else if col == r { - join_cols_to_replace.insert(col, l); - break; - } - } - } + if on_filters.is_empty() + && predicates.is_empty() + && inferred_join_predicates.is_empty() + { + return Ok(Transformed::no(LogicalPlan::Join(join))); + } - if join_cols_to_replace.is_empty() { - return None; - } + push_down_all_join(predicates, inferred_join_predicates, join, on_filters) +} - let join_side_predicate = - match replace_col(predicate.clone(), &join_cols_to_replace) { - Ok(p) => p, - Err(e) => { - return Some(Err(e)); - } - }; +/// Extracts any equi-join join predicates from the given filter expressions. +/// +/// Parameters +/// * `join` the join in question +/// +/// * `predicates` the pushed down filter expression +/// +/// * `on_filters` filters from the join ON clause that have not already been +/// identified as join predicates +/// +fn infer_join_predicates( + join: &Join, + predicates: &[Expr], + on_filters: &[Expr], +) -> Result<Vec<Expr>> { + if join.join_type != JoinType::Inner { Review Comment: this logic was refactored from above ########## datafusion/expr/src/expr.rs: ########## @@ -1275,6 +1275,28 @@ impl Expr { } } + /// Return a reference to the inner `Column` if any Review Comment: Pulled this into its own PR: https://github.com/apache/datafusion/pull/10448 ########## datafusion/optimizer/src/push_down_filter.rs: ########## @@ -525,93 +495,102 @@ fn push_down_join( .as_ref() .map_or_else(Vec::new, |filter| split_conjunction_owned(filter.clone())); - let mut is_inner_join = false; - let infer_predicates = if join.join_type == JoinType::Inner { - is_inner_join = true; - - // Only allow both side key is column. - let join_col_keys = join - .on - .iter() - .filter_map(|(l, r)| { - let left_col = l.try_into_col().ok()?; - let right_col = r.try_into_col().ok()?; - Some((left_col, right_col)) - }) - .collect::<Vec<_>>(); - - // TODO refine the logic, introduce EquivalenceProperties to logical plan and infer additional filters to push down - // For inner joins, duplicate filters for joined columns so filters can be pushed down - // to both sides. Take the following query as an example: - // - // ```sql - // SELECT * FROM t1 JOIN t2 on t1.id = t2.uid WHERE t1.id > 1 - // ``` - // - // `t1.id > 1` predicate needs to be pushed down to t1 table scan, while - // `t2.uid > 1` predicate needs to be pushed down to t2 table scan. - // - // Join clauses with `Using` constraints also take advantage of this logic to make sure - // predicates reference the shared join columns are pushed to both sides. - // This logic should also been applied to conditions in JOIN ON clause - predicates - .iter() - .chain(on_filters.iter()) - .filter_map(|predicate| { - let mut join_cols_to_replace = HashMap::new(); - - let columns = match predicate.to_columns() { - Ok(columns) => columns, - Err(e) => return Some(Err(e)), - }; + // Are there any new join predicates that can be inferred from the filter expressions? + let inferred_join_predicates = + infer_join_predicates(&join, &predicates, &on_filters)?; Review Comment: pulled out to a function to make it easier to see what is going on -- the logic was not changed ########## datafusion/optimizer/src/push_down_filter.rs: ########## @@ -399,23 +389,20 @@ fn extract_or_clause(expr: &Expr, schema_columns: &HashSet<Column>) -> Option<Ex // push down join/cross-join fn push_down_all_join( predicates: Vec<Expr>, - infer_predicates: Vec<Expr>, - join_plan: &LogicalPlan, - left: &LogicalPlan, - right: &LogicalPlan, + inferred_join_predicates: Vec<Expr>, Review Comment: This is refactored so that the Join is passed in, rather than a `LogicalPlan` and the embedded `Join` Plan as well as a reference to the embedded pieces ########## datafusion/optimizer/src/push_down_filter.rs: ########## @@ -640,46 +619,57 @@ impl OptimizerRule for PushDownFilter { plan: LogicalPlan, _config: &dyn OptimizerConfig, ) -> Result<Transformed<LogicalPlan>> { - let filter = match plan { - LogicalPlan::Filter(ref filter) => filter, - LogicalPlan::Join(ref join) => return push_down_join(&plan, join, None), - _ => return Ok(Transformed::no(plan)), + if let LogicalPlan::Join(join) = plan { + return push_down_join(join, None); }; - let child_plan = filter.input.as_ref(); Review Comment: Here is the core change -- instead of looking at the input by `ref` the PR now takes its ownership and then updates the accounting to avoid `clone`ing ########## datafusion/optimizer/src/push_down_filter.rs: ########## @@ -150,44 +151,33 @@ pub struct PushDownFilter {} /// non-preserved side it can be more tricky. /// /// Returns a tuple of booleans - (left_preserved, right_preserved). -fn lr_is_preserved(plan: &LogicalPlan) -> Result<(bool, bool)> { - match plan { - LogicalPlan::Join(Join { join_type, .. }) => match join_type { - JoinType::Inner => Ok((true, true)), - JoinType::Left => Ok((true, false)), - JoinType::Right => Ok((false, true)), - JoinType::Full => Ok((false, false)), - // No columns from the right side of the join can be referenced in output - // predicates for semi/anti joins, so whether we specify t/f doesn't matter. - JoinType::LeftSemi | JoinType::LeftAnti => Ok((true, false)), - // No columns from the left side of the join can be referenced in output - // predicates for semi/anti joins, so whether we specify t/f doesn't matter. - JoinType::RightSemi | JoinType::RightAnti => Ok((false, true)), - }, - LogicalPlan::CrossJoin(_) => Ok((true, true)), - _ => internal_err!("lr_is_preserved only valid for JOIN nodes"), +fn lr_is_preserved(join_type: JoinType) -> Result<(bool, bool)> { Review Comment: the plan was always a join so rather than having to re-check the plan, I changed to simply take the JoinType -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org