jackwener commented on code in PR #4365:
URL: https://github.com/apache/arrow-datafusion/pull/4365#discussion_r1034284068
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -500,302 +387,344 @@ fn optimize_join(
// vector will contain only join keys (without additional
// element representing filter).
let expr = plan.expressions();
- let expr = if !on_filter.is_empty() && on_to_keep.is_empty() {
+ let expr = if !on_filter_empty && keep_condition.is_empty() {
// New filter expression is None - should remove last element
expr[..expr.len() - 1].to_vec()
- } else if !on_to_keep.is_empty() {
+ } else if !keep_condition.is_empty() {
// Replace last element with new filter expression
expr[..expr.len() - 1]
.iter()
.cloned()
- .chain(once(on_to_keep.into_iter().reduce(Expr::and).unwrap()))
+ .chain(once(keep_condition.into_iter().reduce(Expr::and).unwrap()))
.collect()
} else {
plan.expressions()
};
let plan = from_plan(plan, &expr, &[left, right])?;
- if to_keep.0.is_empty() {
+ if keep_predicates.is_empty() {
Ok(plan)
} else {
// wrap the join on the filter whose predicates must be kept
- let plan = utils::add_filter(plan, &to_keep.0)?;
- state.filters = remove_filters(&state.filters, &to_keep.1);
-
- Ok(plan)
+ match conjunction(keep_predicates) {
+ Some(predicate) => Ok(LogicalPlan::Filter(Filter::try_new(
+ predicate,
+ Arc::new(plan),
+ )?)),
+ None => Ok(plan),
+ }
}
}
-fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
- match plan {
- LogicalPlan::Explain { .. } => {
- // push the optimization to the plan of this explain
- push_down(&state, plan)
- }
- LogicalPlan::Analyze { .. } => push_down(&state, plan),
- LogicalPlan::Filter(filter) => {
- let predicate = utils::cnf_rewrite(filter.predicate().clone());
-
- utils::split_conjunction_owned(predicate)
- .into_iter()
- .try_for_each::<_, Result<()>>(|predicate| {
- let columns = predicate.to_columns()?;
- state.filters.push((predicate, columns));
- Ok(())
- })?;
-
- optimize(filter.input(), state)
+fn push_down_join(
+ plan: &LogicalPlan,
+ join: &Join,
+ parent_predicate: Option<&Expr>,
+) -> Result<Option<LogicalPlan>> {
+ let mut predicates = match parent_predicate {
+ Some(parent_predicate) => {
+
utils::split_conjunction_owned(utils::cnf_rewrite(parent_predicate.clone()))
}
- LogicalPlan::Projection(Projection {
- input,
- expr,
- schema,
- }) => {
- // A projection is filter-commutable, but re-writes all predicate
expressions
- // collect projection.
- let projection = schema
- .fields()
- .iter()
- .enumerate()
- .flat_map(|(i, field)| {
- // strip alias, as they should not be part of filters
- let expr = match &expr[i] {
- Expr::Alias(expr, _) => expr.as_ref().clone(),
- expr => expr.clone(),
+ None => vec![],
+ };
+
+ // Convert JOIN ON predicate to Predicates
+ let on_filters = join
+ .filter
+ .as_ref()
+ .map(|e| utils::split_conjunction_owned(e.clone()))
+ .unwrap_or_else(Vec::new);
+
+ if join.join_type == JoinType::Inner {
+ // For inner joins, duplicate filters for joined columns so filters
can be pushed down
Review Comment:
Yes, I think we can.
In fact, SEMI/ANTI JOIN is similar with FILTER
Current code is just for refactor, I don't include extra enhancement.
Related issue to track problems like this
https://github.com/apache/arrow-datafusion/issues/4413
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]