This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 54fcb06  ARROW-4602: [Rust] [DataFusion] Integrate query optimizer 
with ExecutionContext
54fcb06 is described below

commit 54fcb060428d82002398363ab4d43aa54cb4f295
Author: Andy Grove <[email protected]>
AuthorDate: Tue Feb 19 03:29:28 2019 +0100

    ARROW-4602: [Rust] [DataFusion] Integrate query optimizer with 
ExecutionContext
    
    Instead of registering `DataSource` with the context, we now register 
`DataSourceProvider`. This trait has a `scan()` method where we can pass the 
projection.
    
    `ExecutionContext` calls the optimizer rule for all SQL queries now, so 
that only the necessary columns are loaded from disk.
    
    There is also a simpler API for registering CSV files with the context, 
with a `register_csv` method.
    
    I added some criterion benchmarks too but they are not ideal since they 
load from disk each time. I am working on another PR to add support for running 
queries against RecordBatches already loaded into memory and will update the 
benchmarks to use these as part of that PR.
    
    Author: Andy Grove <[email protected]>
    
    Closes #3678 from andygrove/ARROW-4602 and squashes the following commits:
    
    80ea3c99 <Andy Grove> Integrate projection push down rule with 
ExecutionContext
---
 rust/arrow/src/csv/reader.rs                       | 11 ++-
 rust/datafusion/Cargo.toml                         |  4 +
 .../csv_sql.rs => benches/aggregate_query_sql.rs}  | 84 +++++++++----------
 rust/datafusion/examples/csv_sql.rs                | 13 ++-
 rust/datafusion/src/execution/aggregate.rs         |  2 +-
 rust/datafusion/src/execution/context.rs           | 93 +++++++++++++--------
 rust/datafusion/src/execution/datasource.rs        | 95 +++++++++++++++++-----
 rust/datafusion/src/execution/projection.rs        |  3 +
 rust/datafusion/tests/sql.rs                       |  8 +-
 9 files changed, 201 insertions(+), 112 deletions(-)

diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs
index 1be7fff..a511b93 100644
--- a/rust/arrow/src/csv/reader.rs
+++ b/rust/arrow/src/csv/reader.rs
@@ -319,8 +319,17 @@ impl<R: Read> Reader<R> {
             })
             .collect();
 
+        let schema_fields = self.schema.fields();
+
+        let projected_fields: Vec<Field> = projection
+            .iter()
+            .map(|i| schema_fields[*i].clone())
+            .collect();
+
+        let projected_schema = Arc::new(Schema::new(projected_fields));
+
         match arrays {
-            Ok(arr) => Ok(Some(RecordBatch::new(self.schema.clone(), arr))),
+            Ok(arr) => Ok(Some(RecordBatch::new(projected_schema, arr))),
             Err(e) => Err(e),
         }
     }
diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml
index 864243b..0bcbc89 100644
--- a/rust/datafusion/Cargo.toml
+++ b/rust/datafusion/Cargo.toml
@@ -48,3 +48,7 @@ sqlparser = "0.2.0"
 [dev-dependencies]
 criterion = "0.2.0"
 
+[[bench]]
+name = "aggregate_query_sql"
+harness = false
+
diff --git a/rust/datafusion/examples/csv_sql.rs 
b/rust/datafusion/benches/aggregate_query_sql.rs
similarity index 59%
copy from rust/datafusion/examples/csv_sql.rs
copy to rust/datafusion/benches/aggregate_query_sql.rs
index 86f0f7f..772b974 100644
--- a/rust/datafusion/examples/csv_sql.rs
+++ b/rust/datafusion/benches/aggregate_query_sql.rs
@@ -15,22 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cell::RefCell;
-use std::rc::Rc;
+#[macro_use]
+extern crate criterion;
+use criterion::Criterion;
+
 use std::sync::Arc;
 
 extern crate arrow;
 extern crate datafusion;
 
-use arrow::array::{BinaryArray, Float64Array};
 use arrow::datatypes::{DataType, Field, Schema};
 
 use datafusion::execution::context::ExecutionContext;
-use datafusion::execution::datasource::CsvDataSource;
 
-/// This example demonstrates executing a simple query against an Arrow data 
source and
-/// fetching results
-fn main() {
+fn aggregate_query(sql: &str) {
     // create local execution context
     let mut ctx = ExecutionContext::new();
 
@@ -52,15 +50,12 @@ fn main() {
     ]));
 
     // register csv file with the execution context
-    let csv_datasource = CsvDataSource::new(
+    ctx.register_csv(
+        "aggregate_test_100",
         "../../testing/data/csv/aggregate_test_100.csv",
-        schema.clone(),
-        1024,
+        &schema,
+        true,
     );
-    ctx.register_datasource("aggregate_test_100", 
Rc::new(RefCell::new(csv_datasource)));
-
-    // simple projection and selection
-    let sql = "SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 WHERE c11 
> 0.1 AND c11 < 0.9 GROUP BY c1";
 
     // execute the query
     let relation = ctx.sql(&sql).unwrap();
@@ -68,35 +63,36 @@ fn main() {
     // display the relation
     let mut results = relation.borrow_mut();
 
-    while let Some(batch) = results.next().unwrap() {
-        println!(
-            "RecordBatch has {} rows and {} columns",
-            batch.num_rows(),
-            batch.num_columns()
-        );
-
-        let c1 = batch
-            .column(0)
-            .as_any()
-            .downcast_ref::<BinaryArray>()
-            .unwrap();
-
-        let min = batch
-            .column(1)
-            .as_any()
-            .downcast_ref::<Float64Array>()
-            .unwrap();
-
-        let max = batch
-            .column(2)
-            .as_any()
-            .downcast_ref::<Float64Array>()
-            .unwrap();
-
-        for i in 0..batch.num_rows() {
-            let c1_value: String = 
String::from_utf8(c1.value(i).to_vec()).unwrap();
+    while let Some(_) = results.next().unwrap() {}
+}
 
-            println!("{}, Min: {}, Max: {}", c1_value, min.value(i), 
max.value(i),);
-        }
-    }
+fn criterion_benchmark(c: &mut Criterion) {
+    c.bench_function("aggregate_query_no_group_by", |b| {
+        b.iter(|| {
+            aggregate_query(
+                "SELECT MIN(c12), MAX(c12) \
+                 FROM aggregate_test_100",
+            )
+        })
+    });
+    c.bench_function("aggregate_query_group_by", |b| {
+        b.iter(|| {
+            aggregate_query(
+                "SELECT c1, MIN(c12), MAX(c12) \
+                 FROM aggregate_test_100 GROUP BY c1",
+            )
+        })
+    });
+    c.bench_function("aggregate_query_group_by_with_filter", |b| {
+        b.iter(|| {
+            aggregate_query(
+                "SELECT c1, MIN(c12), MAX(c12) \
+                 FROM aggregate_test_100 \
+                 WHERE c11 > 0.1 AND c11 < 0.9 GROUP BY c1",
+            )
+        })
+    });
 }
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/rust/datafusion/examples/csv_sql.rs 
b/rust/datafusion/examples/csv_sql.rs
index 86f0f7f..82d3d11 100644
--- a/rust/datafusion/examples/csv_sql.rs
+++ b/rust/datafusion/examples/csv_sql.rs
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cell::RefCell;
-use std::rc::Rc;
 use std::sync::Arc;
 
 extern crate arrow;
@@ -26,7 +24,6 @@ use arrow::array::{BinaryArray, Float64Array};
 use arrow::datatypes::{DataType, Field, Schema};
 
 use datafusion::execution::context::ExecutionContext;
-use datafusion::execution::datasource::CsvDataSource;
 
 /// This example demonstrates executing a simple query against an Arrow data 
source and
 /// fetching results
@@ -52,18 +49,18 @@ fn main() {
     ]));
 
     // register csv file with the execution context
-    let csv_datasource = CsvDataSource::new(
+    ctx.register_csv(
+        "aggregate_test_100",
         "../../testing/data/csv/aggregate_test_100.csv",
-        schema.clone(),
-        1024,
+        &schema,
+        true,
     );
-    ctx.register_datasource("aggregate_test_100", 
Rc::new(RefCell::new(csv_datasource)));
 
     // simple projection and selection
     let sql = "SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 WHERE c11 
> 0.1 AND c11 < 0.9 GROUP BY c1";
 
     // execute the query
-    let relation = ctx.sql(&sql).unwrap();
+    let relation = ctx.sql(&sql, 1024 * 1024).unwrap();
 
     // display the relation
     let mut results = relation.borrow_mut();
diff --git a/rust/datafusion/src/execution/aggregate.rs 
b/rust/datafusion/src/execution/aggregate.rs
index 127a1ab..049c1f1 100644
--- a/rust/datafusion/src/execution/aggregate.rs
+++ b/rust/datafusion/src/execution/aggregate.rs
@@ -1208,7 +1208,7 @@ mod tests {
     }
 
     fn load_csv(filename: &str, schema: &Arc<Schema>) -> Rc<RefCell<Relation>> 
{
-        let ds = CsvDataSource::new(filename, schema.clone(), 1024);
+        let ds = CsvDataSource::new(filename, schema.clone(), true, &None, 
1024);
         Rc::new(RefCell::new(DataSourceRelation::new(Rc::new(
             RefCell::new(ds),
         ))))
diff --git a/rust/datafusion/src/execution/context.rs 
b/rust/datafusion/src/execution/context.rs
index 59c65a8..95c5c85 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -15,18 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
+//! ExecutionContext contains methods for registering data sources and 
executing SQL queries
+
 use std::cell::RefCell;
 use std::collections::HashMap;
 use std::rc::Rc;
+use std::string::String;
 use std::sync::Arc;
 
 use arrow::datatypes::*;
 
 use super::super::dfparser::{DFASTNode, DFParser};
 use super::super::logicalplan::*;
+use super::super::optimizer::optimizer::OptimizerRule;
+use super::super::optimizer::projection_push_down::ProjectionPushDown;
 use super::super::sqlplanner::{SchemaProvider, SqlToRel};
 use super::aggregate::AggregateRelation;
-use super::datasource::DataSource;
+use super::datasource::{CsvProvider, DataSourceProvider};
 use super::error::{ExecutionError, Result};
 use super::expression::*;
 use super::filter::FilterRelation;
@@ -35,17 +40,20 @@ use super::projection::ProjectRelation;
 use super::relation::{DataSourceRelation, Relation};
 
 pub struct ExecutionContext {
-    datasources: Rc<RefCell<HashMap<String, Rc<RefCell<DataSource>>>>>,
+    datasources: Rc<RefCell<HashMap<String, Rc<DataSourceProvider>>>>,
 }
 
 impl ExecutionContext {
+    /// Create a new excution context for in-memory queries
     pub fn new() -> Self {
         Self {
             datasources: Rc::new(RefCell::new(HashMap::new())),
         }
     }
 
-    pub fn sql(&mut self, sql: &str) -> Result<Rc<RefCell<Relation>>> {
+    /// Execute a SQL query and produce a Relation (a schema-aware iterator 
over a series
+    /// of RecordBatch instances)
+    pub fn sql(&mut self, sql: &str, batch_size: usize) -> 
Result<Rc<RefCell<Relation>>> {
         let ast = DFParser::parse_sql(String::from(sql))?;
 
         match ast {
@@ -60,12 +68,10 @@ impl ExecutionContext {
 
                 // plan the query (create a logical relational plan)
                 let plan = query_planner.sql_to_rel(&ansi)?;
-                //println!("Logical plan: {:?}", plan);
 
-                let optimized_plan = plan; //push_down_projection(&plan, 
&HashSet::new());
-                                           //println!("Optimized logical plan: 
{:?}", new_plan);
+                let optimized_plan = self.optimize(&plan)?;
 
-                let relation = self.execute(&optimized_plan)?;
+                let relation = self.execute(&optimized_plan, batch_size)?;
 
                 Ok(relation)
             }
@@ -73,31 +79,53 @@ impl ExecutionContext {
         }
     }
 
-    pub fn register_datasource(&mut self, name: &str, ds: 
Rc<RefCell<DataSource>>) {
-        self.datasources.borrow_mut().insert(name.to_string(), ds);
+    /// Register a CSV file as a table so that it can be queried from SQL
+    pub fn register_csv(
+        &mut self,
+        name: &str,
+        filename: &str,
+        schema: &Schema,
+        has_header: bool,
+    ) {
+        self.datasources.borrow_mut().insert(
+            name.to_string(),
+            Rc::new(CsvProvider::new(filename, schema, has_header)),
+        );
     }
 
-    pub fn execute(&mut self, plan: &LogicalPlan) -> 
Result<Rc<RefCell<Relation>>> {
-        println!("Logical plan: {:?}", plan);
+    /// Optimize the logical plan by applying optimizer rules
+    fn optimize(&self, plan: &LogicalPlan) -> Result<Rc<LogicalPlan>> {
+        let mut rule = ProjectionPushDown::new();
+        Ok(rule.optimize(plan)?)
+    }
 
+    /// Execute a logical plan and produce a Relation (a schema-aware iterator 
over a series
+    /// of RecordBatch instances)
+    pub fn execute(
+        &mut self,
+        plan: &LogicalPlan,
+        batch_size: usize,
+    ) -> Result<Rc<RefCell<Relation>>> {
         match *plan {
-            LogicalPlan::TableScan { ref table_name, .. } => {
-                match self.datasources.borrow().get(table_name) {
-                    Some(ds) => {
-                        //TODO: projection
-                        
Ok(Rc::new(RefCell::new(DataSourceRelation::new(ds.clone()))))
-                    }
-                    _ => Err(ExecutionError::General(format!(
-                        "No table registered as '{}'",
-                        table_name
-                    ))),
+            LogicalPlan::TableScan {
+                ref table_name,
+                ref projection,
+                ..
+            } => match self.datasources.borrow().get(table_name) {
+                Some(provider) => {
+                    let ds = provider.scan(projection, batch_size);
+                    Ok(Rc::new(RefCell::new(DataSourceRelation::new(ds))))
                 }
-            }
+                _ => Err(ExecutionError::General(format!(
+                    "No table registered as '{}'",
+                    table_name
+                ))),
+            },
             LogicalPlan::Selection {
                 ref expr,
                 ref input,
             } => {
-                let input_rel = self.execute(input)?;
+                let input_rel = self.execute(input, batch_size)?;
                 let input_schema = 
input_rel.as_ref().borrow().schema().clone();
                 let runtime_expr = compile_scalar_expr(&self, expr, 
&input_schema)?;
                 let rel = FilterRelation::new(
@@ -112,7 +140,7 @@ impl ExecutionContext {
                 ref input,
                 ..
             } => {
-                let input_rel = self.execute(input)?;
+                let input_rel = self.execute(input, batch_size)?;
 
                 let input_schema = 
input_rel.as_ref().borrow().schema().clone();
 
@@ -136,7 +164,7 @@ impl ExecutionContext {
                 ref aggr_expr,
                 ..
             } => {
-                let input_rel = self.execute(&input)?;
+                let input_rel = self.execute(&input, batch_size)?;
 
                 let input_schema = 
input_rel.as_ref().borrow().schema().clone();
 
@@ -166,7 +194,7 @@ impl ExecutionContext {
                 ref input,
                 ..
             } => {
-                let input_rel = self.execute(input)?;
+                let input_rel = self.execute(input, batch_size)?;
 
                 let input_schema = 
input_rel.as_ref().borrow().schema().clone();
 
@@ -200,13 +228,7 @@ impl ExecutionContext {
     }
 }
 
-#[derive(Debug, Clone)]
-pub enum ExecutionResult {
-    Unit,
-    Count(usize),
-    Str(String),
-}
-
+/// Create field meta-data from an expression, for use in a result set schema
 pub fn expr_to_field(e: &Expr, input_schema: &Schema) -> Field {
     match e {
         Expr::Column(i) => input_schema.fields()[*i].clone(),
@@ -239,6 +261,7 @@ pub fn expr_to_field(e: &Expr, input_schema: &Schema) -> 
Field {
     }
 }
 
+/// Create field meta-data from an expression, for use in a result set schema
 pub fn exprlist_to_fields(expr: &Vec<Expr>, input_schema: &Schema) -> 
Vec<Field> {
     expr.iter()
         .map(|e| expr_to_field(e, input_schema))
@@ -246,12 +269,12 @@ pub fn exprlist_to_fields(expr: &Vec<Expr>, input_schema: 
&Schema) -> Vec<Field>
 }
 
 struct ExecutionContextSchemaProvider {
-    datasources: Rc<RefCell<HashMap<String, Rc<RefCell<DataSource>>>>>,
+    datasources: Rc<RefCell<HashMap<String, Rc<DataSourceProvider>>>>,
 }
 impl SchemaProvider for ExecutionContextSchemaProvider {
     fn get_table_meta(&self, name: &str) -> Option<Arc<Schema>> {
         match self.datasources.borrow().get(name) {
-            Some(ds) => Some(ds.borrow().schema().clone()),
+            Some(ds) => Some(ds.schema().clone()),
             None => None,
         }
     }
diff --git a/rust/datafusion/src/execution/datasource.rs 
b/rust/datafusion/src/execution/datasource.rs
index 379632b..992dfb0 100644
--- a/rust/datafusion/src/execution/datasource.rs
+++ b/rust/datafusion/src/execution/datasource.rs
@@ -17,12 +17,14 @@
 
 //! Data sources
 
+use std::cell::RefCell;
 use std::fs::File;
 use std::rc::Rc;
+use std::string::String;
 use std::sync::Arc;
 
 use arrow::csv;
-use arrow::datatypes::Schema;
+use arrow::datatypes::{Field, Schema};
 use arrow::record_batch::RecordBatch;
 
 use super::error::Result;
@@ -39,10 +41,36 @@ pub struct CsvDataSource {
 }
 
 impl CsvDataSource {
-    pub fn new(filename: &str, schema: Arc<Schema>, batch_size: usize) -> Self 
{
+    pub fn new(
+        filename: &str,
+        schema: Arc<Schema>,
+        has_header: bool,
+        projection: &Option<Vec<usize>>,
+        batch_size: usize,
+    ) -> Self {
         let file = File::open(filename).unwrap();
-        let reader = csv::Reader::new(file, schema.clone(), true, batch_size, 
None);
-        Self { schema, reader }
+        let reader = csv::Reader::new(
+            file,
+            schema.clone(),
+            has_header,
+            batch_size,
+            projection.clone(),
+        );
+
+        let projected_schema = match projection {
+            Some(p) => {
+                let projected_fields: Vec<Field> =
+                    p.iter().map(|i| schema.fields()[*i].clone()).collect();
+
+                Arc::new(Schema::new(projected_fields))
+            }
+            None => schema,
+        };
+
+        Self {
+            schema: projected_schema,
+            reader,
+        }
     }
 }
 
@@ -56,19 +84,48 @@ impl DataSource for CsvDataSource {
     }
 }
 
-#[derive(Serialize, Deserialize, Clone)]
-pub enum DataSourceMeta {
-    /// Represents a CSV file with a provided schema
-    CsvFile {
-        filename: String,
-        schema: Rc<Schema>,
-        has_header: bool,
-        projection: Option<Vec<usize>>,
-    },
-    /// Represents a Parquet file that contains schema information
-    ParquetFile {
-        filename: String,
-        schema: Rc<Schema>,
-        projection: Option<Vec<usize>>,
-    },
+pub trait DataSourceProvider {
+    fn schema(&self) -> &Arc<Schema>;
+    fn scan(
+        &self,
+        projection: &Option<Vec<usize>>,
+        batch_size: usize,
+    ) -> Rc<RefCell<DataSource>>;
+}
+
+/// Represents a CSV file with a provided schema
+pub struct CsvProvider {
+    filename: String,
+    schema: Arc<Schema>,
+    has_header: bool,
+}
+
+impl CsvProvider {
+    pub fn new(filename: &str, schema: &Schema, has_header: bool) -> Self {
+        Self {
+            filename: String::from(filename),
+            schema: Arc::new(schema.clone()),
+            has_header,
+        }
+    }
+}
+
+impl DataSourceProvider for CsvProvider {
+    fn schema(&self) -> &Arc<Schema> {
+        &self.schema
+    }
+
+    fn scan(
+        &self,
+        projection: &Option<Vec<usize>>,
+        batch_size: usize,
+    ) -> Rc<RefCell<DataSource>> {
+        Rc::new(RefCell::new(CsvDataSource::new(
+            &self.filename,
+            self.schema.clone(),
+            self.has_header,
+            projection,
+            batch_size,
+        )))
+    }
 }
diff --git a/rust/datafusion/src/execution/projection.rs 
b/rust/datafusion/src/execution/projection.rs
index 6de5818..d9b4a94 100644
--- a/rust/datafusion/src/execution/projection.rs
+++ b/rust/datafusion/src/execution/projection.rs
@@ -104,9 +104,12 @@ mod tests {
             Field::new("c11", DataType::Float64, false),
             Field::new("c12", DataType::Utf8, false),
         ]));
+
         let ds = CsvDataSource::new(
             "../../testing/data/csv/aggregate_test_100.csv",
             schema.clone(),
+            true,
+            &None,
             1024,
         );
         let relation = Rc::new(RefCell::new(DataSourceRelation::new(Rc::new(
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 6f96980..954b2ee 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -26,9 +26,10 @@ use arrow::array::*;
 use arrow::datatypes::{DataType, Field, Schema};
 
 use datafusion::execution::context::ExecutionContext;
-use datafusion::execution::datasource::CsvDataSource;
 use datafusion::execution::relation::Relation;
 
+const DEFAULT_BATCH_SIZE: usize = 1024 * 1024;
+
 #[test]
 fn csv_query_with_predicate() {
     let mut ctx = ExecutionContext::new();
@@ -159,13 +160,12 @@ fn register_csv(
     filename: &str,
     schema: &Arc<Schema>,
 ) {
-    let csv_datasource = CsvDataSource::new(filename, schema.clone(), 1024);
-    ctx.register_datasource(name, Rc::new(RefCell::new(csv_datasource)));
+    ctx.register_csv(name, filename, &schema, true);
 }
 
 /// Execute query and return result set as tab delimited string
 fn execute(ctx: &mut ExecutionContext, sql: &str) -> String {
-    let results = ctx.sql(&sql).unwrap();
+    let results = ctx.sql(&sql, DEFAULT_BATCH_SIZE).unwrap();
     result_str(&results)
 }
 

Reply via email to