jorgecarleitao commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r471004814



##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,505 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early 
as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan 
where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is a operation whose result of filter(op(data)) 
= op(filter(data)).
+An example of a filter-commutative operation is a projection; a 
counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A 
on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a 
filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the 
last operation
+and highest for the first operation (tipically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the 
filters
+to the maximum possible depth, consequently re-writing the filter expressions 
by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three 
items:
+
+1. Where are the filters located in the plan
+2. Where are non-commutable operations' columns located in the plan 
(break_points)
+3. Where are projections located in the plan
+
+With this information, we re-write the plan by:
+
+1. Computing the maximum possible depth of each column
+2. Computing the maximum possible depth of each filter expression based on the 
columns it depends on
+3. re-write the filter expression for every projection that it commutes with 
from its original depth to its max possible depth
+*/
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let (break_points, selections, projections) = analyze_plan(plan, 0)?;
+
+        // compute max depth for each of the columns
+        let mut breaks: HashMap<String, usize> = HashMap::new();

Review comment:
       Good catch. Yes, you are right. Leftovers from a previous iteration :/




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to