This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3606aa2  ARROW-4589: [Rust] Projection push down query optimizer rule
3606aa2 is described below

commit 3606aa21f766a2c0c55ebcca9a0e7a3fd2157ae0
Author: Andy Grove <[email protected]>
AuthorDate: Sun Feb 17 07:53:00 2019 -0700

    ARROW-4589: [Rust] Projection push down query optimizer rule
    
    This PR adds the first query optimizer rule, which rewrites a logical plan 
to push the projection down to the TableScan.
    
    Once this is merged, I will create a follow up PR to integrate this into 
the query engine so that only the necessary columns are loaded from disk.
    
    Author: Andy Grove <[email protected]>
    
    Closes #3664 from andygrove/ARROW-4589-wip and squashes the following 
commits:
    
    b876f28 <Andy Grove> revert formatting change that broke the tests
    2051deb <Andy Grove> formatting comments and test strings to be < 90 
columns wide
    8effde3 <Andy Grove> Address PR feedback, fix bug, add extra unit test
    ecdd32a <Andy Grove> refactor code to reduce duplication
    6229b32 <Andy Grove> refactor code to reduce duplication
    f959500 <Andy Grove> implement projection push down for rest of logical 
plan variants
    5fd5382 <Andy Grove> implement collect_expr and rewrite_expr for all 
expression types
    bd49f17 <Andy Grove> improve error handling
    92918dd <Andy Grove> Implement projection push-down for selection and make 
projection deterministic
    a80cfdf <Andy Grove> Implement mapping and expression rewrite logic
    26fd3b4 <Andy Grove> revert change
    d7c4822 <Andy Grove> formatting and add assertion to test
    e81af14 <Andy Grove> Roughing out projection push down rule
---
 rust/datafusion/src/lib.rs                         |   1 +
 rust/datafusion/src/{lib.rs => optimizer/mod.rs}   |  15 +-
 .../src/{lib.rs => optimizer/optimizer.rs}         |  19 +-
 .../src/optimizer/projection_push_down.rs          | 415 +++++++++++++++++++++
 4 files changed, 426 insertions(+), 24 deletions(-)

diff --git a/rust/datafusion/src/lib.rs b/rust/datafusion/src/lib.rs
index efcad5d..6ea2a36 100644
--- a/rust/datafusion/src/lib.rs
+++ b/rust/datafusion/src/lib.rs
@@ -27,4 +27,5 @@ extern crate sqlparser;
 pub mod dfparser;
 pub mod execution;
 pub mod logicalplan;
+pub mod optimizer;
 pub mod sqlplanner;
diff --git a/rust/datafusion/src/lib.rs b/rust/datafusion/src/optimizer/mod.rs
similarity index 71%
copy from rust/datafusion/src/lib.rs
copy to rust/datafusion/src/optimizer/mod.rs
index efcad5d..3e50328 100644
--- a/rust/datafusion/src/lib.rs
+++ b/rust/datafusion/src/optimizer/mod.rs
@@ -15,16 +15,5 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! DataFusion is a modern distributed compute platform implemented in Rust 
that uses Apache Arrow
-//! as the memory model
-
-extern crate arrow;
-#[macro_use]
-extern crate serde_derive;
-extern crate serde_json;
-extern crate sqlparser;
-
-pub mod dfparser;
-pub mod execution;
-pub mod logicalplan;
-pub mod sqlplanner;
+pub mod optimizer;
+pub mod projection_push_down;
diff --git a/rust/datafusion/src/lib.rs 
b/rust/datafusion/src/optimizer/optimizer.rs
similarity index 71%
copy from rust/datafusion/src/lib.rs
copy to rust/datafusion/src/optimizer/optimizer.rs
index efcad5d..9ab8e8c 100644
--- a/rust/datafusion/src/lib.rs
+++ b/rust/datafusion/src/optimizer/optimizer.rs
@@ -15,16 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! DataFusion is a modern distributed compute platform implemented in Rust 
that uses Apache Arrow
-//! as the memory model
+//! Query optimizer traits
 
-extern crate arrow;
-#[macro_use]
-extern crate serde_derive;
-extern crate serde_json;
-extern crate sqlparser;
+use crate::logicalplan::LogicalPlan;
+use arrow::error::Result;
+use std::rc::Rc;
 
-pub mod dfparser;
-pub mod execution;
-pub mod logicalplan;
-pub mod sqlplanner;
+/// An optimizer rules performs a transformation on a logical plan to produce 
an optimized logical plan.
+pub trait OptimizerRule {
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<Rc<LogicalPlan>>;
+}
diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs 
b/rust/datafusion/src/optimizer/projection_push_down.rs
new file mode 100644
index 0000000..8fd2e8c
--- /dev/null
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -0,0 +1,415 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Projection Push Down optimizer rule ensures that only referenced columns 
are
+//! loaded into memory
+
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use arrow::datatypes::{Field, Schema};
+use arrow::error::{ArrowError, Result};
+use std::collections::{HashMap, HashSet};
+use std::rc::Rc;
+use std::sync::Arc;
+
+/// Projection Push Down optimizer rule ensures that only referenced columns 
are
+/// loaded into memory
+pub struct ProjectionPushDown {}
+
+impl OptimizerRule for ProjectionPushDown {
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<Rc<LogicalPlan>> {
+        let mut accum: HashSet<usize> = HashSet::new();
+        let mut mapping: HashMap<usize, usize> = HashMap::new();
+        self.optimize_plan(plan, &mut accum, &mut mapping)
+    }
+}
+
+impl ProjectionPushDown {
+    pub fn new() -> Self {
+        Self {}
+    }
+
+    fn optimize_plan(
+        &self,
+        plan: &LogicalPlan,
+        accum: &mut HashSet<usize>,
+        mapping: &mut HashMap<usize, usize>,
+    ) -> Result<Rc<LogicalPlan>> {
+        match plan {
+            LogicalPlan::Projection {
+                expr,
+                input,
+                schema,
+            } => {
+                // collect all columns referenced by projection expressions
+                self.collect_exprs(&expr, accum);
+
+                // push projection down
+                let input = self.optimize_plan(&input, accum, mapping)?;
+
+                // rewrite projection expressions to use new column indexes
+                let new_expr = self.rewrite_exprs(expr, mapping)?;
+
+                Ok(Rc::new(LogicalPlan::Projection {
+                    expr: new_expr,
+                    input,
+                    schema: schema.clone(),
+                }))
+            }
+            LogicalPlan::Selection { expr, input } => {
+                // collect all columns referenced by filter expression
+                self.collect_expr(expr, accum);
+
+                // push projection down
+                let input = self.optimize_plan(&input, accum, mapping)?;
+
+                // rewrite filter expression to use new column indexes
+                let new_expr = self.rewrite_expr(expr, mapping)?;
+
+                Ok(Rc::new(LogicalPlan::Selection {
+                    expr: new_expr,
+                    input,
+                }))
+            }
+            LogicalPlan::Aggregate {
+                input,
+                group_expr,
+                aggr_expr,
+                schema,
+            } => {
+                // collect all columns referenced by grouping and aggregate 
expressions
+                self.collect_exprs(&group_expr, accum);
+                self.collect_exprs(&aggr_expr, accum);
+
+                // push projection down
+                let input = self.optimize_plan(&input, accum, mapping)?;
+
+                // rewrite expressions to use new column indexes
+                let new_group_expr = self.rewrite_exprs(group_expr, mapping)?;
+                let new_aggr_expr = self.rewrite_exprs(aggr_expr, mapping)?;
+
+                Ok(Rc::new(LogicalPlan::Aggregate {
+                    input,
+                    group_expr: new_group_expr,
+                    aggr_expr: new_aggr_expr,
+                    schema: schema.clone(),
+                }))
+            }
+            LogicalPlan::Sort {
+                expr,
+                input,
+                schema,
+            } => {
+                // collect all columns referenced by sort expressions
+                self.collect_exprs(&expr, accum);
+
+                // push projection down
+                let input = self.optimize_plan(&input, accum, mapping)?;
+
+                // rewrite sort expressions to use new column indexes
+                let new_expr = self.rewrite_exprs(expr, mapping)?;
+
+                Ok(Rc::new(LogicalPlan::Sort {
+                    expr: new_expr,
+                    input,
+                    schema: schema.clone(),
+                }))
+            }
+            LogicalPlan::EmptyRelation { schema } => {
+                Ok(Rc::new(LogicalPlan::EmptyRelation {
+                    schema: schema.clone(),
+                }))
+            }
+            LogicalPlan::TableScan {
+                schema_name,
+                table_name,
+                schema,
+                ..
+            } => {
+                // once we reach the table scan, we can use the accumulated 
set of column indexes as
+                // the projection in the table scan
+                let mut projection: Vec<usize> = 
Vec::with_capacity(accum.len());
+                accum.iter().for_each(|i| projection.push(*i));
+
+                // sort the projection otherwise we get non-deterministic 
behavior
+                projection.sort();
+
+                // create the projected schema
+                let mut projected_fields: Vec<Field> =
+                    Vec::with_capacity(projection.len());
+                for i in 0..projection.len() {
+                    projected_fields.push(schema.fields()[i].clone());
+                }
+                let projected_schema = Schema::new(projected_fields);
+
+                // now that the table scan is returning a different schema we 
need to create a
+                // mapping from the original column index to the new column 
index so that we
+                // can rewrite expressions as we walk back up the tree
+
+                if mapping.len() != 0 {
+                    return Err(ArrowError::ComputeError("illegal 
state".to_string()));
+                }
+
+                for i in 0..schema.fields().len() {
+                    if let Some(n) = projection.iter().position(|v| *v == i) {
+                        mapping.insert(i, n);
+                    }
+                }
+
+                // return the table scan with projection
+                Ok(Rc::new(LogicalPlan::TableScan {
+                    schema_name: schema_name.to_string(),
+                    table_name: table_name.to_string(),
+                    schema: Arc::new(projected_schema),
+                    projection: Some(projection),
+                }))
+            }
+        }
+    }
+
+    fn collect_exprs(&self, expr: &Vec<Expr>, accum: &mut HashSet<usize>) {
+        expr.iter().for_each(|e| self.collect_expr(e, accum));
+    }
+
+    fn collect_expr(&self, expr: &Expr, accum: &mut HashSet<usize>) {
+        match expr {
+            Expr::Column(i) => {
+                accum.insert(*i);
+            }
+            Expr::Literal(_) => { /* not needed */ }
+            Expr::IsNull(e) => self.collect_expr(e, accum),
+            Expr::IsNotNull(e) => self.collect_expr(e, accum),
+            Expr::BinaryExpr { left, right, .. } => {
+                self.collect_expr(left, accum);
+                self.collect_expr(right, accum);
+            }
+            Expr::Cast { expr, .. } => self.collect_expr(expr, accum),
+            Expr::Sort { expr, .. } => self.collect_expr(expr, accum),
+            Expr::AggregateFunction { args, .. } => self.collect_exprs(args, 
accum),
+            Expr::ScalarFunction { args, .. } => self.collect_exprs(args, 
accum),
+        }
+    }
+
+    fn rewrite_exprs(
+        &self,
+        expr: &Vec<Expr>,
+        mapping: &HashMap<usize, usize>,
+    ) -> Result<Vec<Expr>> {
+        Ok(expr
+            .iter()
+            .map(|e| self.rewrite_expr(e, mapping))
+            .collect::<Result<Vec<Expr>>>()?)
+    }
+
+    fn rewrite_expr(&self, expr: &Expr, mapping: &HashMap<usize, usize>) -> 
Result<Expr> {
+        match expr {
+            Expr::Column(i) => Ok(Expr::Column(self.new_index(mapping, i)?)),
+            Expr::Literal(_) => Ok(expr.clone()),
+            Expr::IsNull(e) => Ok(Expr::IsNull(Rc::new(self.rewrite_expr(e, 
mapping)?))),
+            Expr::IsNotNull(e) => {
+                Ok(Expr::IsNotNull(Rc::new(self.rewrite_expr(e, mapping)?)))
+            }
+            Expr::BinaryExpr { left, op, right } => Ok(Expr::BinaryExpr {
+                left: Rc::new(self.rewrite_expr(left, mapping)?),
+                op: op.clone(),
+                right: Rc::new(self.rewrite_expr(right, mapping)?),
+            }),
+            Expr::Cast { expr, data_type } => Ok(Expr::Cast {
+                expr: Rc::new(self.rewrite_expr(expr, mapping)?),
+                data_type: data_type.clone(),
+            }),
+            Expr::Sort { expr, asc } => Ok(Expr::Sort {
+                expr: Rc::new(self.rewrite_expr(expr, mapping)?),
+                asc: *asc,
+            }),
+            Expr::AggregateFunction {
+                name,
+                args,
+                return_type,
+            } => Ok(Expr::AggregateFunction {
+                name: name.to_string(),
+                args: self.rewrite_exprs(args, mapping)?,
+                return_type: return_type.clone(),
+            }),
+            Expr::ScalarFunction {
+                name,
+                args,
+                return_type,
+            } => Ok(Expr::ScalarFunction {
+                name: name.to_string(),
+                args: self.rewrite_exprs(args, mapping)?,
+                return_type: return_type.clone(),
+            }),
+        }
+    }
+
+    fn new_index(&self, mapping: &HashMap<usize, usize>, i: &usize) -> 
Result<usize> {
+        match mapping.get(i) {
+            Some(j) => Ok(*j),
+            _ => Err(ArrowError::ComputeError(
+                "Internal error computing new column index".to_string(),
+            )),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    use super::*;
+    use crate::logicalplan::Expr::*;
+    use crate::logicalplan::LogicalPlan::*;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use std::borrow::Borrow;
+    use std::rc::Rc;
+    use std::sync::Arc;
+
+    #[test]
+    fn aggregate_no_group_by() {
+        let table_scan = test_table_scan();
+
+        let aggregate = Aggregate {
+            group_expr: vec![],
+            aggr_expr: vec![Column(1)],
+            schema: Arc::new(Schema::new(vec![Field::new(
+                "MAX(b)",
+                DataType::UInt32,
+                false,
+            )])),
+            input: Rc::new(table_scan),
+        };
+
+        assert_optimized_plan_eq(&aggregate, "Aggregate: groupBy=[[]], 
aggr=[[#0]]\n  TableScan: test projection=Some([1])");
+    }
+
+    #[test]
+    fn aggregate_group_by() {
+        let table_scan = test_table_scan();
+
+        let aggregate = Aggregate {
+            group_expr: vec![Column(2)],
+            aggr_expr: vec![Column(1)],
+            schema: Arc::new(Schema::new(vec![
+                Field::new("c", DataType::UInt32, false),
+                Field::new("MAX(b)", DataType::UInt32, false),
+            ])),
+            input: Rc::new(table_scan),
+        };
+
+        assert_optimized_plan_eq(&aggregate, "Aggregate: groupBy=[[#1]], 
aggr=[[#0]]\n  TableScan: test projection=Some([1, 2])");
+    }
+
+    #[test]
+    fn aggregate_no_group_by_with_selection() {
+        let table_scan = test_table_scan();
+
+        let selection = Selection {
+            expr: Column(2),
+            input: Rc::new(table_scan),
+        };
+
+        let aggregate = Aggregate {
+            group_expr: vec![],
+            aggr_expr: vec![Column(1)],
+            schema: Arc::new(Schema::new(vec![Field::new(
+                "MAX(b)",
+                DataType::UInt32,
+                false,
+            )])),
+            input: Rc::new(selection),
+        };
+
+        assert_optimized_plan_eq(&aggregate, "Aggregate: groupBy=[[]], 
aggr=[[#0]]\n  Selection: #1\n    TableScan: test projection=Some([1, 2])");
+    }
+
+    #[test]
+    fn cast() {
+        let table_scan = test_table_scan();
+
+        let projection = Projection {
+            expr: vec![Cast {
+                expr: Rc::new(Column(2)),
+                data_type: DataType::Float64,
+            }],
+            input: Rc::new(table_scan),
+            schema: Arc::new(Schema::new(vec![Field::new(
+                "CAST(c AS float)",
+                DataType::Float64,
+                false,
+            )])),
+        };
+
+        assert_optimized_plan_eq(
+            &projection,
+            "Projection: CAST(#0 AS Float64)\n  TableScan: test 
projection=Some([2])",
+        );
+    }
+
+    #[test]
+    fn table_scan_projected_schema() {
+        let table_scan = test_table_scan();
+        assert_eq!(3, table_scan.schema().fields().len());
+
+        let projection = Projection {
+            expr: vec![Column(0), Column(1)],
+            input: Rc::new(table_scan),
+            schema: Arc::new(Schema::new(vec![
+                Field::new("a", DataType::UInt32, false),
+                Field::new("b", DataType::UInt32, false),
+            ])),
+        };
+
+        let optimized_plan = optimize(&projection);
+
+        // check that table scan schema now contains 2 columns
+        match optimized_plan.as_ref().borrow() {
+            LogicalPlan::Projection { input, .. } => match 
input.as_ref().borrow() {
+                LogicalPlan::TableScan { ref schema, .. } => {
+                    assert_eq!(2, schema.fields().len());
+                }
+                _ => panic!(),
+            },
+            _ => panic!(),
+        }
+    }
+
+    fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
+        let optimized_plan = optimize(plan);
+        let formatted_plan = format!("{:?}", optimized_plan);
+        assert_eq!(formatted_plan, expected);
+    }
+
+    fn optimize(plan: &LogicalPlan) -> Rc<LogicalPlan> {
+        let mut rule = ProjectionPushDown::new();
+        rule.optimize(plan).unwrap()
+    }
+
+    /// all tests share a common table
+    fn test_table_scan() -> LogicalPlan {
+        TableScan {
+            schema_name: "default".to_string(),
+            table_name: "test".to_string(),
+            schema: Arc::new(Schema::new(vec![
+                Field::new("a", DataType::UInt32, false),
+                Field::new("b", DataType::UInt32, false),
+                Field::new("c", DataType::UInt32, false),
+            ])),
+            projection: None,
+        }
+    }
+}

Reply via email to