jorgecarleitao commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r471069937



##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,631 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early 
as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::{and, LogicalPlan};
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{BTreeMap, HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan 
where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is an operation whose result of 
filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a 
counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A 
on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a 
filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the 
last operation
+and highest for the first operation (typically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the 
filters
+to the maximum possible depth, consequently re-writing the filter expressions 
by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three 
items:
+
+1. Where are the filters located in the plan
+2. Where are non-commutable operations' columns located in the plan 
(break_points)
+3. Where are projections located in the plan
+
+With this information, we re-write the plan by:
+
+1. Computing the maximum possible depth of each column between breakpoints
+2. Computing the maximum possible depth of each filter expression based on the 
columns it depends on
+3. re-write the filter expression for every projection that it commutes with 
from its original depth to its max possible depth
+4. recursively re-write the plan by deleting old filter expressions and adding 
new filter expressions on their max possible depth.
+*/
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn name(&self) -> &str {
+        return "filter_push_down";
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let result = analyze_plan(plan, 0)?;
+        let break_points = result.break_points.clone();
+
+        // get max depth over all breakpoints
+        let max_depth = break_points.keys().max();
+        if max_depth.is_none() {
+            // it is unlikely that the plan is correct without break points as 
all scans
+            // adds breakpoints. We just return the plan and let others handle 
the error
+            return Ok(plan.clone());

Review comment:
       The comment is poorly written. What I was trying to say is that we allow 
the compiler to not return an error on poorly designed plans, in which case it 
just does not perform any optimization. This way, the user is likely to receive 
a better error message.
   
   This is a design decision that we need to take wrt to optimizers (error or 
ignore?). I have no strong opinion about it either: we can also return an error.
   
   Let me know what you prefer that I will change it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to