alamb commented on code in PR #4365:
URL: https://github.com/apache/arrow-datafusion/pull/4365#discussion_r1035098171
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -500,302 +387,386 @@ fn optimize_join(
// vector will contain only join keys (without additional
// element representing filter).
let expr = plan.expressions();
- let expr = if !on_filter.is_empty() && on_to_keep.is_empty() {
+ let expr = if !on_filter_empty && keep_condition.is_empty() {
// New filter expression is None - should remove last element
expr[..expr.len() - 1].to_vec()
- } else if !on_to_keep.is_empty() {
+ } else if !keep_condition.is_empty() {
// Replace last element with new filter expression
expr[..expr.len() - 1]
.iter()
.cloned()
- .chain(once(on_to_keep.into_iter().reduce(Expr::and).unwrap()))
+ .chain(once(keep_condition.into_iter().reduce(Expr::and).unwrap()))
.collect()
} else {
plan.expressions()
};
let plan = from_plan(plan, &expr, &[left, right])?;
- if to_keep.0.is_empty() {
+ if keep_predicates.is_empty() {
Ok(plan)
} else {
// wrap the join on the filter whose predicates must be kept
- let plan = utils::add_filter(plan, &to_keep.0)?;
- state.filters = remove_filters(&state.filters, &to_keep.1);
-
- Ok(plan)
+ match conjunction(keep_predicates) {
+ Some(predicate) => Ok(LogicalPlan::Filter(Filter::try_new(
+ predicate,
+ Arc::new(plan),
+ )?)),
+ None => Ok(plan),
+ }
}
}
-fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
- match plan {
- LogicalPlan::Explain { .. } => {
- // push the optimization to the plan of this explain
- push_down(&state, plan)
- }
- LogicalPlan::Analyze { .. } => push_down(&state, plan),
- LogicalPlan::Filter(filter) => {
- let predicate = utils::cnf_rewrite(filter.predicate().clone());
-
- utils::split_conjunction_owned(predicate)
- .into_iter()
- .try_for_each::<_, Result<()>>(|predicate| {
- let columns = predicate.to_columns()?;
- state.filters.push((predicate, columns));
- Ok(())
- })?;
-
- optimize(filter.input(), state)
+fn push_down_join(
+ plan: &LogicalPlan,
+ join: &Join,
+ parent_predicate: Option<&Expr>,
+) -> Result<Option<LogicalPlan>> {
+ let mut predicates = match parent_predicate {
+ Some(parent_predicate) => {
+
utils::split_conjunction_owned(utils::cnf_rewrite(parent_predicate.clone()))
}
- LogicalPlan::Projection(Projection {
- input,
- expr,
- schema,
- }) => {
- // A projection is filter-commutable, but re-writes all predicate
expressions
- // collect projection.
- let projection = schema
- .fields()
- .iter()
- .enumerate()
- .flat_map(|(i, field)| {
- // strip alias, as they should not be part of filters
- let expr = match &expr[i] {
- Expr::Alias(expr, _) => expr.as_ref().clone(),
- expr => expr.clone(),
+ None => vec![],
+ };
+
+ // Convert JOIN ON predicate to Predicates
+ let on_filters = join
+ .filter
+ .as_ref()
+ .map(|e| utils::split_conjunction_owned(e.clone()))
+ .unwrap_or_else(Vec::new);
+
+ if join.join_type == JoinType::Inner {
+ // For inner joins, duplicate filters for joined columns so filters
can be pushed down
+ // to both sides. Take the following query as an example:
+ //
+ // ```sql
+ // SELECT * FROM t1 JOIN t2 on t1.id = t2.uid WHERE t1.id > 1
+ // ```
+ //
+ // `t1.id > 1` predicate needs to be pushed down to t1 table scan,
while
+ // `t2.uid > 1` predicate needs to be pushed down to t2 table scan.
+ //
+ // Join clauses with `Using` constraints also take advantage of this
logic to make sure
+ // predicates reference the shared join columns are pushed to both
sides.
+ // This logic should also been applied to conditions in JOIN ON clause
+ let join_side_filters = predicates
+ .iter()
+ .chain(on_filters.iter())
+ .filter_map(|predicate| {
+ let mut join_cols_to_replace = HashMap::new();
+ let columns = match predicate.to_columns() {
+ Ok(columns) => columns,
+ Err(e) => return Some(Err(e)),
+ };
+
+ for col in columns.iter() {
+ for (l, r) in join.on.iter() {
+ if col == l {
+ join_cols_to_replace.insert(col, r);
+ break;
+ } else if col == r {
+ join_cols_to_replace.insert(col, l);
+ break;
+ }
+ }
+ }
+
+ if join_cols_to_replace.is_empty() {
+ return None;
+ }
+
+ let join_side_predicate =
+ match replace_col(predicate.clone(),
&join_cols_to_replace) {
+ Ok(p) => p,
+ Err(e) => {
+ return Some(Err(e));
+ }
};
- // Convert both qualified and unqualified fields
- [
- (field.name().clone(), expr.clone()),
- (field.qualified_name(), expr),
- ]
- })
- .collect::<HashMap<_, _>>();
+ Some(Ok(join_side_predicate))
+ })
+ .collect::<Result<Vec<_>>>()?;
+ predicates.extend(join_side_filters);
+ }
+ if on_filters.is_empty() && predicates.is_empty() {
+ return Ok(None);
+ }
+ Ok(Some(push_down_all_join(
+ predicates,
+ plan,
+ &join.left,
+ &join.right,
+ on_filters,
+ )?))
+}
- // re-write all filters based on this projection
- // E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap
them, but the filter must be "a > 1"
- for (predicate, columns) in state.filters.iter_mut() {
- *predicate = replace_cols_by_name(predicate.clone(),
&projection)?;
+impl OptimizerRule for PushDownFilter {
+ fn name(&self) -> &str {
+ "push_down_filter"
+ }
- columns.clear();
- expr_to_columns(predicate, columns)?;
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> Result<LogicalPlan> {
+ let filter = match plan {
+ LogicalPlan::Filter(filter) => filter,
+ // we also need to pushdown filter in Join.
+ LogicalPlan::Join(join) => {
+ let optimized_plan = push_down_join(plan, join, None)?;
+ return match optimized_plan {
+ Some(optimized_plan) => {
+ utils::optimize_children(self, &optimized_plan,
optimizer_config)
+ }
+ None => utils::optimize_children(self, plan,
optimizer_config),
+ };
}
+ _ => return utils::optimize_children(self, plan, optimizer_config),
+ };
- // optimize inner
- let new_input = optimize(input, state)?;
- Ok(from_plan(plan, expr, &[new_input])?)
- }
- LogicalPlan::Aggregate(Aggregate { aggr_expr, .. }) => {
- // An aggregate's aggreagate columns are _not_ filter-commutable
=> collect these:
- // * columns whose aggregation expression depends on
- // * the aggregation columns themselves
-
- // construct set of columns that `aggr_expr` depends on
- let mut used_columns = HashSet::new();
- exprlist_to_columns(aggr_expr, &mut used_columns)?;
-
- let agg_columns = aggr_expr
- .iter()
- .map(|x| Ok(Column::from_name(x.display_name()?)))
- .collect::<Result<HashSet<_>>>()?;
- used_columns.extend(agg_columns);
-
- issue_filters(state, used_columns, plan)
- }
- LogicalPlan::Sort { .. } => {
- // sort is filter-commutable
- push_down(&state, plan)
- }
- LogicalPlan::Union(Union { inputs: _, schema }) => {
- // union changing all qualifiers while building logical plan so we
need
- // to rewrite filters to push unqualified columns to inputs
- let projection = schema
- .fields()
- .iter()
- .map(|field| (field.qualified_name(), col(field.name())))
- .collect::<HashMap<_, _>>();
-
- // rewriting predicate expressions using unqualified names as
replacements
- if !projection.is_empty() {
- for (predicate, columns) in state.filters.iter_mut() {
- *predicate = replace_cols_by_name(predicate.clone(),
&projection)?;
-
- columns.clear();
- expr_to_columns(predicate, columns)?;
- }
+ let child_plan = &**filter.input();
+ let new_plan = match child_plan {
+ LogicalPlan::Filter(child_filter) => {
+ let new_predicate =
+ and(filter.predicate().clone(),
child_filter.predicate().clone());
+ let new_plan = LogicalPlan::Filter(Filter::try_new(
+ new_predicate,
+ child_filter.input().clone(),
+ )?);
+ return self.optimize(&new_plan, optimizer_config);
}
-
- push_down(&state, plan)
- }
- LogicalPlan::Limit(Limit { input, .. }) => {
- // limit is _not_ filter-commutable => collect all columns from
its input
- let used_columns = input
- .schema()
- .fields()
- .iter()
- .map(|f| f.qualified_column())
- .collect::<HashSet<_>>();
- issue_filters(state, used_columns, plan)
- }
- LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
- optimize_join(state, plan, left, right, vec![])
- }
- LogicalPlan::Join(Join {
- left,
- right,
- on,
- filter,
- join_type,
- ..
- }) => {
- // Convert JOIN ON predicate to Predicates
- let on_filters = filter
- .as_ref()
- .map(|e| {
- let predicates = utils::split_conjunction(e);
-
- predicates
- .into_iter()
- .map(|e| Ok((e.clone(), e.to_columns()?)))
- .collect::<Result<Vec<_>>>()
- })
- .unwrap_or_else(|| Ok(vec![]))?;
-
- if *join_type == JoinType::Inner {
- // For inner joins, duplicate filters for joined columns so
filters can be pushed down
- // to both sides. Take the following query as an example:
- //
- // ```sql
- // SELECT * FROM t1 JOIN t2 on t1.id = t2.uid WHERE t1.id > 1
- // ```
- //
- // `t1.id > 1` predicate needs to be pushed down to t1 table
scan, while
- // `t2.uid > 1` predicate needs to be pushed down to t2 table
scan.
- //
- // Join clauses with `Using` constraints also take advantage
of this logic to make sure
- // predicates reference the shared join columns are pushed to
both sides.
- // This logic should also been applied to conditions in JOIN
ON clause
- let join_side_filters = state
- .filters
+ LogicalPlan::Repartition(_)
+ | LogicalPlan::Distinct(_)
+ | LogicalPlan::Sort(_) => {
+ // commutable
+ let new_filter =
+ plan.with_new_inputs(&[
+ (**(child_plan.inputs().get(0).unwrap())).clone()
+ ])?;
+ child_plan.with_new_inputs(&[new_filter])?
+ }
+ LogicalPlan::Projection(projection) => {
+ // A projection is filter-commutable, but re-writes all
predicate expressions
+ // collect projection.
+ let replace_map = projection
+ .schema
+ .fields()
.iter()
- .chain(on_filters.iter())
- .filter_map(|(predicate, columns)| {
- let mut join_cols_to_replace = HashMap::new();
- for col in columns.iter() {
- for (l, r) in on {
- if col == l {
- join_cols_to_replace.insert(col, r);
- break;
- } else if col == r {
- join_cols_to_replace.insert(col, l);
- break;
- }
- }
- }
+ .enumerate()
+ .map(|(i, field)| {
+ // strip alias, as they should not be part of filters
+ let expr = match &projection.expr[i] {
+ Expr::Alias(expr, _) => expr.as_ref().clone(),
+ expr => expr.clone(),
+ };
+
+ (field.qualified_name(), expr)
+ })
+ .collect::<HashMap<_, _>>();
- if join_cols_to_replace.is_empty() {
- return None;
- }
+ // re-write all filters based on this projection
+ // E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap
them, but the filter must be "a > 1"
+ let new_filter = LogicalPlan::Filter(Filter::try_new(
+ replace_cols_by_name(filter.predicate().clone(),
&replace_map)?,
+ projection.input.clone(),
+ )?);
- let join_side_predicate =
- match replace_col(predicate.clone(),
&join_cols_to_replace) {
- Ok(p) => p,
- Err(e) => {
- return Some(Err(e));
- }
- };
-
- let join_side_columns = columns
- .clone()
- .into_iter()
- // replace keys in join_cols_to_replace with
values in resulting column
- // set
- .filter(|c| !join_cols_to_replace.contains_key(c))
- .chain(join_cols_to_replace.values().map(|v|
(*v).clone()))
- .collect();
-
- Some(Ok((join_side_predicate, join_side_columns)))
- })
- .collect::<Result<Vec<_>>>()?;
- state.filters.extend(join_side_filters);
+ child_plan.with_new_inputs(&[new_filter])?
}
+ LogicalPlan::Union(union) => {
+ let mut inputs = Vec::with_capacity(union.inputs.len());
+ for input in &union.inputs {
+ let mut replace_map = HashMap::new();
+ for (i, field) in
input.schema().fields().iter().enumerate() {
+ replace_map.insert(
+
union.schema.fields().get(i).unwrap().qualified_name(),
+ Expr::Column(field.qualified_column()),
+ );
+ }
- optimize_join(state, plan, left, right, on_filters)
- }
- LogicalPlan::TableScan(TableScan {
- source,
- projected_schema,
- filters,
- projection,
- table_name,
- fetch,
- }) => {
- let mut used_columns = HashSet::new();
- let mut new_filters = filters.clone();
-
- for (filter_expr, cols) in &state.filters {
- let (preserve_filter_node, add_to_provider) =
- match source.supports_filter_pushdown(filter_expr)? {
- TableProviderFilterPushDown::Unsupported => (true,
false),
- TableProviderFilterPushDown::Inexact => (true, true),
- TableProviderFilterPushDown::Exact => (false, true),
- };
-
- if preserve_filter_node {
- used_columns.extend(cols.clone());
+ let push_predicate =
+ replace_cols_by_name(filter.predicate().clone(),
&replace_map)?;
+ inputs.push(Arc::new(LogicalPlan::Filter(Filter::try_new(
+ push_predicate,
+ input.clone(),
+ )?)))
+ }
+ LogicalPlan::Union(Union {
+ inputs,
+ schema: plan.schema().clone(),
+ })
+ }
+ LogicalPlan::Aggregate(agg) => {
+ // An aggregate's aggregate columns are _not_
filter-commutable => collect these:
+ // * columns whose aggregation expression depends on
+ // * the aggregation columns themselves
+
+ // construct set of columns that `aggr_expr` depends on
+ let mut used_columns = HashSet::new();
+ exprlist_to_columns(&agg.aggr_expr, &mut used_columns)?;
+ let agg_columns = agg
+ .aggr_expr
+ .iter()
+ .map(|x| Ok(Column::from_name(x.display_name()?)))
+ .collect::<Result<HashSet<_>>>()?;
+ used_columns.extend(agg_columns);
+
+ let predicates =
utils::split_conjunction_owned(utils::cnf_rewrite(
+ filter.predicate().clone(),
+ ));
+
+ let mut keep_predicates = vec![];
+ let mut push_predicates = vec![];
+ for expr in predicates {
+ let columns = expr.to_columns()?;
+ if columns.is_empty()
+ || !columns
+ .intersection(&used_columns)
+ .collect::<HashSet<_>>()
+ .is_empty()
+ {
+ keep_predicates.push(expr);
+ } else {
+ push_predicates.push(expr);
+ }
}
- if add_to_provider {
- // Don't add expression again if it's already present in
- // pushed down filters.
- if new_filters.contains(filter_expr) {
- continue;
+ let child = match conjunction(push_predicates) {
+ Some(predicate) => LogicalPlan::Filter(Filter::try_new(
+ predicate,
+ Arc::new((*agg.input).clone()),
+ )?),
+ None => (*agg.input).clone(),
+ };
+ let new_agg = from_plan(
+ filter.input(),
+ &filter.input().expressions(),
+ &vec![child],
+ )?;
+ match conjunction(keep_predicates) {
+ Some(predicate) => LogicalPlan::Filter(Filter::try_new(
+ predicate,
+ Arc::new(new_agg),
+ )?),
+ None => new_agg,
+ }
+ }
+ LogicalPlan::Window(window) => {
+ let mut used_columns = HashSet::new();
+ exprlist_to_columns(&window.window_expr, &mut used_columns)?;
+ let window_columns = window
+ .window_expr
+ .iter()
+ .map(|x| Ok(Column::from_name(x.display_name()?)))
+ .collect::<Result<HashSet<_>>>()?;
+ used_columns.extend(window_columns);
+
+ let predicates =
utils::split_conjunction_owned(utils::cnf_rewrite(
+ filter.predicate().clone(),
+ ));
+
+ let mut keep_predicates = vec![];
+ let mut push_predicates = vec![];
+ for expr in predicates {
+ let columns = expr.to_columns()?;
+ if columns.is_empty()
+ || !columns
+ .intersection(&used_columns)
+ .collect::<HashSet<_>>()
+ .is_empty()
+ {
+ keep_predicates.push(expr);
+ } else {
+ push_predicates.push(expr);
}
- new_filters.push(filter_expr.clone());
+ }
+
+ let child = match conjunction(push_predicates) {
+ Some(predicate) => LogicalPlan::Filter(Filter::try_new(
+ predicate,
+ Arc::new((*window.input).clone()),
+ )?),
+ None => (*window.input).clone(),
+ };
+ let new_agg = from_plan(
+ filter.input(),
+ &filter.input().expressions(),
+ &vec![child],
+ )?;
+ match conjunction(keep_predicates) {
+ Some(predicate) => LogicalPlan::Filter(Filter::try_new(
+ predicate,
+ Arc::new(new_agg),
+ )?),
+ None => new_agg,
+ }
+ }
+ LogicalPlan::Join(join) => {
+ match push_down_join(filter.input(), join,
Some(filter.predicate()))? {
+ Some(optimized_plan) => optimized_plan,
+ None => plan.clone(),
}
}
+ LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
+ let predicates =
utils::split_conjunction_owned(utils::cnf_rewrite(
+ filter.predicate().clone(),
+ ));
- issue_filters(
- state,
- used_columns,
- &LogicalPlan::TableScan(TableScan {
- source: source.clone(),
- projection: projection.clone(),
- projected_schema: projected_schema.clone(),
- table_name: table_name.clone(),
- filters: new_filters,
- fetch: *fetch,
- }),
- )
- }
- _ => {
- // all other plans are _not_ filter-commutable
- let used_columns = plan
- .schema()
- .fields()
- .iter()
- .map(|f| f.qualified_column())
- .collect::<HashSet<_>>();
- issue_filters(state, used_columns, plan)
- }
- }
-}
+ push_down_all_join(predicates, filter.input(), left, right,
vec![])?
+ }
+ LogicalPlan::TableScan(scan) => {
Review Comment:
I agree that the pushdown into scan could be done as part of the physical
planning phase. However, since the current filter pushdown happens in the
logical planning phase I think it is ok to keep the same behavior in this PR
and move the pushdown in some other PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]