jorgecarleitao commented on a change in pull request #7880: URL: https://github.com/apache/arrow/pull/7880#discussion_r471004814
########## File path: rust/datafusion/src/optimizer/filter_push_down.rs ########## @@ -0,0 +1,505 @@ +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan + +use crate::error::Result; +use crate::logicalplan::Expr; +use crate::logicalplan::LogicalPlan; +use crate::optimizer::optimizer::OptimizerRule; +use crate::optimizer::utils; +use std::collections::{HashMap, HashSet}; + +/// Filter Push Down optimizer rule pushes filter clauses down the plan +/// +/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and +/// re-writes the plan with filters on those locations. +/// It performs two passes on the plan: +/// 1. identify filters, which columns they use, and projections along the path +/// 2. move filters down, re-writing the expressions using the projections +/* +A filter-commutative operation is a operation whose result of filter(op(data)) = op(filter(data)). +An example of a filter-commutative operation is a projection; a counter-example is `limit`. + +The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B) +can commute with a filter that depends on A only, but does not commute with a filter that depends +on SUM(B). + +A location in this module is identified by a number, depth, which is 0 for the last operation +and highest for the first operation (tipically a scan). + +This optimizer commutes filters with filter-commutative operations to push the filters +to the maximum possible depth, consequently re-writing the filter expressions by every +projection that changes the filter's expression. + + Selection: #b Gt Int64(10) + Projection: #a AS b + +is optimized to + + Projection: #a AS b + Selection: #a Gt Int64(10) <--- changed from #b to #a + +To perform such optimization, we first analyze the plan to identify three items: + +1. Where are the filters located in the plan +2. Where are non-commutable operations' columns located in the plan (break_points) +3. Where are projections located in the plan + +With this information, we re-write the plan by: + +1. Computing the maximum possible depth of each column +2. Computing the maximum possible depth of each filter expression based on the columns it depends on +3. re-write the filter expression for every projection that it commutes with from its original depth to its max possible depth +*/ +pub struct FilterPushDown {} + +impl OptimizerRule for FilterPushDown { + fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> { + let (break_points, selections, projections) = analyze_plan(plan, 0)?; + + // compute max depth for each of the columns + let mut breaks: HashMap<String, usize> = HashMap::new(); Review comment: Good catch. Yes, you are right. Leftovers from a previous iteration :/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org