This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 54fcb06 ARROW-4602: [Rust] [DataFusion] Integrate query optimizer
with ExecutionContext
54fcb06 is described below
commit 54fcb060428d82002398363ab4d43aa54cb4f295
Author: Andy Grove <[email protected]>
AuthorDate: Tue Feb 19 03:29:28 2019 +0100
ARROW-4602: [Rust] [DataFusion] Integrate query optimizer with
ExecutionContext
Instead of registering `DataSource` with the context, we now register
`DataSourceProvider`. This trait has a `scan()` method where we can pass the
projection.
`ExecutionContext` calls the optimizer rule for all SQL queries now, so
that only the necessary columns are loaded from disk.
There is also a simpler API for registering CSV files with the context,
with a `register_csv` method.
I added some criterion benchmarks too but they are not ideal since they
load from disk each time. I am working on another PR to add support for running
queries against RecordBatches already loaded into memory and will update the
benchmarks to use these as part of that PR.
Author: Andy Grove <[email protected]>
Closes #3678 from andygrove/ARROW-4602 and squashes the following commits:
80ea3c99 <Andy Grove> Integrate projection push down rule with
ExecutionContext
---
rust/arrow/src/csv/reader.rs | 11 ++-
rust/datafusion/Cargo.toml | 4 +
.../csv_sql.rs => benches/aggregate_query_sql.rs} | 84 +++++++++----------
rust/datafusion/examples/csv_sql.rs | 13 ++-
rust/datafusion/src/execution/aggregate.rs | 2 +-
rust/datafusion/src/execution/context.rs | 93 +++++++++++++--------
rust/datafusion/src/execution/datasource.rs | 95 +++++++++++++++++-----
rust/datafusion/src/execution/projection.rs | 3 +
rust/datafusion/tests/sql.rs | 8 +-
9 files changed, 201 insertions(+), 112 deletions(-)
diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs
index 1be7fff..a511b93 100644
--- a/rust/arrow/src/csv/reader.rs
+++ b/rust/arrow/src/csv/reader.rs
@@ -319,8 +319,17 @@ impl<R: Read> Reader<R> {
})
.collect();
+ let schema_fields = self.schema.fields();
+
+ let projected_fields: Vec<Field> = projection
+ .iter()
+ .map(|i| schema_fields[*i].clone())
+ .collect();
+
+ let projected_schema = Arc::new(Schema::new(projected_fields));
+
match arrays {
- Ok(arr) => Ok(Some(RecordBatch::new(self.schema.clone(), arr))),
+ Ok(arr) => Ok(Some(RecordBatch::new(projected_schema, arr))),
Err(e) => Err(e),
}
}
diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml
index 864243b..0bcbc89 100644
--- a/rust/datafusion/Cargo.toml
+++ b/rust/datafusion/Cargo.toml
@@ -48,3 +48,7 @@ sqlparser = "0.2.0"
[dev-dependencies]
criterion = "0.2.0"
+[[bench]]
+name = "aggregate_query_sql"
+harness = false
+
diff --git a/rust/datafusion/examples/csv_sql.rs
b/rust/datafusion/benches/aggregate_query_sql.rs
similarity index 59%
copy from rust/datafusion/examples/csv_sql.rs
copy to rust/datafusion/benches/aggregate_query_sql.rs
index 86f0f7f..772b974 100644
--- a/rust/datafusion/examples/csv_sql.rs
+++ b/rust/datafusion/benches/aggregate_query_sql.rs
@@ -15,22 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-use std::cell::RefCell;
-use std::rc::Rc;
+#[macro_use]
+extern crate criterion;
+use criterion::Criterion;
+
use std::sync::Arc;
extern crate arrow;
extern crate datafusion;
-use arrow::array::{BinaryArray, Float64Array};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::execution::context::ExecutionContext;
-use datafusion::execution::datasource::CsvDataSource;
-/// This example demonstrates executing a simple query against an Arrow data
source and
-/// fetching results
-fn main() {
+fn aggregate_query(sql: &str) {
// create local execution context
let mut ctx = ExecutionContext::new();
@@ -52,15 +50,12 @@ fn main() {
]));
// register csv file with the execution context
- let csv_datasource = CsvDataSource::new(
+ ctx.register_csv(
+ "aggregate_test_100",
"../../testing/data/csv/aggregate_test_100.csv",
- schema.clone(),
- 1024,
+ &schema,
+ true,
);
- ctx.register_datasource("aggregate_test_100",
Rc::new(RefCell::new(csv_datasource)));
-
- // simple projection and selection
- let sql = "SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 WHERE c11
> 0.1 AND c11 < 0.9 GROUP BY c1";
// execute the query
let relation = ctx.sql(&sql).unwrap();
@@ -68,35 +63,36 @@ fn main() {
// display the relation
let mut results = relation.borrow_mut();
- while let Some(batch) = results.next().unwrap() {
- println!(
- "RecordBatch has {} rows and {} columns",
- batch.num_rows(),
- batch.num_columns()
- );
-
- let c1 = batch
- .column(0)
- .as_any()
- .downcast_ref::<BinaryArray>()
- .unwrap();
-
- let min = batch
- .column(1)
- .as_any()
- .downcast_ref::<Float64Array>()
- .unwrap();
-
- let max = batch
- .column(2)
- .as_any()
- .downcast_ref::<Float64Array>()
- .unwrap();
-
- for i in 0..batch.num_rows() {
- let c1_value: String =
String::from_utf8(c1.value(i).to_vec()).unwrap();
+ while let Some(_) = results.next().unwrap() {}
+}
- println!("{}, Min: {}, Max: {}", c1_value, min.value(i),
max.value(i),);
- }
- }
+fn criterion_benchmark(c: &mut Criterion) {
+ c.bench_function("aggregate_query_no_group_by", |b| {
+ b.iter(|| {
+ aggregate_query(
+ "SELECT MIN(c12), MAX(c12) \
+ FROM aggregate_test_100",
+ )
+ })
+ });
+ c.bench_function("aggregate_query_group_by", |b| {
+ b.iter(|| {
+ aggregate_query(
+ "SELECT c1, MIN(c12), MAX(c12) \
+ FROM aggregate_test_100 GROUP BY c1",
+ )
+ })
+ });
+ c.bench_function("aggregate_query_group_by_with_filter", |b| {
+ b.iter(|| {
+ aggregate_query(
+ "SELECT c1, MIN(c12), MAX(c12) \
+ FROM aggregate_test_100 \
+ WHERE c11 > 0.1 AND c11 < 0.9 GROUP BY c1",
+ )
+ })
+ });
}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/rust/datafusion/examples/csv_sql.rs
b/rust/datafusion/examples/csv_sql.rs
index 86f0f7f..82d3d11 100644
--- a/rust/datafusion/examples/csv_sql.rs
+++ b/rust/datafusion/examples/csv_sql.rs
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::cell::RefCell;
-use std::rc::Rc;
use std::sync::Arc;
extern crate arrow;
@@ -26,7 +24,6 @@ use arrow::array::{BinaryArray, Float64Array};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::execution::context::ExecutionContext;
-use datafusion::execution::datasource::CsvDataSource;
/// This example demonstrates executing a simple query against an Arrow data
source and
/// fetching results
@@ -52,18 +49,18 @@ fn main() {
]));
// register csv file with the execution context
- let csv_datasource = CsvDataSource::new(
+ ctx.register_csv(
+ "aggregate_test_100",
"../../testing/data/csv/aggregate_test_100.csv",
- schema.clone(),
- 1024,
+ &schema,
+ true,
);
- ctx.register_datasource("aggregate_test_100",
Rc::new(RefCell::new(csv_datasource)));
// simple projection and selection
let sql = "SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 WHERE c11
> 0.1 AND c11 < 0.9 GROUP BY c1";
// execute the query
- let relation = ctx.sql(&sql).unwrap();
+ let relation = ctx.sql(&sql, 1024 * 1024).unwrap();
// display the relation
let mut results = relation.borrow_mut();
diff --git a/rust/datafusion/src/execution/aggregate.rs
b/rust/datafusion/src/execution/aggregate.rs
index 127a1ab..049c1f1 100644
--- a/rust/datafusion/src/execution/aggregate.rs
+++ b/rust/datafusion/src/execution/aggregate.rs
@@ -1208,7 +1208,7 @@ mod tests {
}
fn load_csv(filename: &str, schema: &Arc<Schema>) -> Rc<RefCell<Relation>>
{
- let ds = CsvDataSource::new(filename, schema.clone(), 1024);
+ let ds = CsvDataSource::new(filename, schema.clone(), true, &None,
1024);
Rc::new(RefCell::new(DataSourceRelation::new(Rc::new(
RefCell::new(ds),
))))
diff --git a/rust/datafusion/src/execution/context.rs
b/rust/datafusion/src/execution/context.rs
index 59c65a8..95c5c85 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -15,18 +15,23 @@
// specific language governing permissions and limitations
// under the License.
+//! ExecutionContext contains methods for registering data sources and
executing SQL queries
+
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
+use std::string::String;
use std::sync::Arc;
use arrow::datatypes::*;
use super::super::dfparser::{DFASTNode, DFParser};
use super::super::logicalplan::*;
+use super::super::optimizer::optimizer::OptimizerRule;
+use super::super::optimizer::projection_push_down::ProjectionPushDown;
use super::super::sqlplanner::{SchemaProvider, SqlToRel};
use super::aggregate::AggregateRelation;
-use super::datasource::DataSource;
+use super::datasource::{CsvProvider, DataSourceProvider};
use super::error::{ExecutionError, Result};
use super::expression::*;
use super::filter::FilterRelation;
@@ -35,17 +40,20 @@ use super::projection::ProjectRelation;
use super::relation::{DataSourceRelation, Relation};
pub struct ExecutionContext {
- datasources: Rc<RefCell<HashMap<String, Rc<RefCell<DataSource>>>>>,
+ datasources: Rc<RefCell<HashMap<String, Rc<DataSourceProvider>>>>,
}
impl ExecutionContext {
+ /// Create a new excution context for in-memory queries
pub fn new() -> Self {
Self {
datasources: Rc::new(RefCell::new(HashMap::new())),
}
}
- pub fn sql(&mut self, sql: &str) -> Result<Rc<RefCell<Relation>>> {
+ /// Execute a SQL query and produce a Relation (a schema-aware iterator
over a series
+ /// of RecordBatch instances)
+ pub fn sql(&mut self, sql: &str, batch_size: usize) ->
Result<Rc<RefCell<Relation>>> {
let ast = DFParser::parse_sql(String::from(sql))?;
match ast {
@@ -60,12 +68,10 @@ impl ExecutionContext {
// plan the query (create a logical relational plan)
let plan = query_planner.sql_to_rel(&ansi)?;
- //println!("Logical plan: {:?}", plan);
- let optimized_plan = plan; //push_down_projection(&plan,
&HashSet::new());
- //println!("Optimized logical plan:
{:?}", new_plan);
+ let optimized_plan = self.optimize(&plan)?;
- let relation = self.execute(&optimized_plan)?;
+ let relation = self.execute(&optimized_plan, batch_size)?;
Ok(relation)
}
@@ -73,31 +79,53 @@ impl ExecutionContext {
}
}
- pub fn register_datasource(&mut self, name: &str, ds:
Rc<RefCell<DataSource>>) {
- self.datasources.borrow_mut().insert(name.to_string(), ds);
+ /// Register a CSV file as a table so that it can be queried from SQL
+ pub fn register_csv(
+ &mut self,
+ name: &str,
+ filename: &str,
+ schema: &Schema,
+ has_header: bool,
+ ) {
+ self.datasources.borrow_mut().insert(
+ name.to_string(),
+ Rc::new(CsvProvider::new(filename, schema, has_header)),
+ );
}
- pub fn execute(&mut self, plan: &LogicalPlan) ->
Result<Rc<RefCell<Relation>>> {
- println!("Logical plan: {:?}", plan);
+ /// Optimize the logical plan by applying optimizer rules
+ fn optimize(&self, plan: &LogicalPlan) -> Result<Rc<LogicalPlan>> {
+ let mut rule = ProjectionPushDown::new();
+ Ok(rule.optimize(plan)?)
+ }
+ /// Execute a logical plan and produce a Relation (a schema-aware iterator
over a series
+ /// of RecordBatch instances)
+ pub fn execute(
+ &mut self,
+ plan: &LogicalPlan,
+ batch_size: usize,
+ ) -> Result<Rc<RefCell<Relation>>> {
match *plan {
- LogicalPlan::TableScan { ref table_name, .. } => {
- match self.datasources.borrow().get(table_name) {
- Some(ds) => {
- //TODO: projection
-
Ok(Rc::new(RefCell::new(DataSourceRelation::new(ds.clone()))))
- }
- _ => Err(ExecutionError::General(format!(
- "No table registered as '{}'",
- table_name
- ))),
+ LogicalPlan::TableScan {
+ ref table_name,
+ ref projection,
+ ..
+ } => match self.datasources.borrow().get(table_name) {
+ Some(provider) => {
+ let ds = provider.scan(projection, batch_size);
+ Ok(Rc::new(RefCell::new(DataSourceRelation::new(ds))))
}
- }
+ _ => Err(ExecutionError::General(format!(
+ "No table registered as '{}'",
+ table_name
+ ))),
+ },
LogicalPlan::Selection {
ref expr,
ref input,
} => {
- let input_rel = self.execute(input)?;
+ let input_rel = self.execute(input, batch_size)?;
let input_schema =
input_rel.as_ref().borrow().schema().clone();
let runtime_expr = compile_scalar_expr(&self, expr,
&input_schema)?;
let rel = FilterRelation::new(
@@ -112,7 +140,7 @@ impl ExecutionContext {
ref input,
..
} => {
- let input_rel = self.execute(input)?;
+ let input_rel = self.execute(input, batch_size)?;
let input_schema =
input_rel.as_ref().borrow().schema().clone();
@@ -136,7 +164,7 @@ impl ExecutionContext {
ref aggr_expr,
..
} => {
- let input_rel = self.execute(&input)?;
+ let input_rel = self.execute(&input, batch_size)?;
let input_schema =
input_rel.as_ref().borrow().schema().clone();
@@ -166,7 +194,7 @@ impl ExecutionContext {
ref input,
..
} => {
- let input_rel = self.execute(input)?;
+ let input_rel = self.execute(input, batch_size)?;
let input_schema =
input_rel.as_ref().borrow().schema().clone();
@@ -200,13 +228,7 @@ impl ExecutionContext {
}
}
-#[derive(Debug, Clone)]
-pub enum ExecutionResult {
- Unit,
- Count(usize),
- Str(String),
-}
-
+/// Create field meta-data from an expression, for use in a result set schema
pub fn expr_to_field(e: &Expr, input_schema: &Schema) -> Field {
match e {
Expr::Column(i) => input_schema.fields()[*i].clone(),
@@ -239,6 +261,7 @@ pub fn expr_to_field(e: &Expr, input_schema: &Schema) ->
Field {
}
}
+/// Create field meta-data from an expression, for use in a result set schema
pub fn exprlist_to_fields(expr: &Vec<Expr>, input_schema: &Schema) ->
Vec<Field> {
expr.iter()
.map(|e| expr_to_field(e, input_schema))
@@ -246,12 +269,12 @@ pub fn exprlist_to_fields(expr: &Vec<Expr>, input_schema:
&Schema) -> Vec<Field>
}
struct ExecutionContextSchemaProvider {
- datasources: Rc<RefCell<HashMap<String, Rc<RefCell<DataSource>>>>>,
+ datasources: Rc<RefCell<HashMap<String, Rc<DataSourceProvider>>>>,
}
impl SchemaProvider for ExecutionContextSchemaProvider {
fn get_table_meta(&self, name: &str) -> Option<Arc<Schema>> {
match self.datasources.borrow().get(name) {
- Some(ds) => Some(ds.borrow().schema().clone()),
+ Some(ds) => Some(ds.schema().clone()),
None => None,
}
}
diff --git a/rust/datafusion/src/execution/datasource.rs
b/rust/datafusion/src/execution/datasource.rs
index 379632b..992dfb0 100644
--- a/rust/datafusion/src/execution/datasource.rs
+++ b/rust/datafusion/src/execution/datasource.rs
@@ -17,12 +17,14 @@
//! Data sources
+use std::cell::RefCell;
use std::fs::File;
use std::rc::Rc;
+use std::string::String;
use std::sync::Arc;
use arrow::csv;
-use arrow::datatypes::Schema;
+use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
use super::error::Result;
@@ -39,10 +41,36 @@ pub struct CsvDataSource {
}
impl CsvDataSource {
- pub fn new(filename: &str, schema: Arc<Schema>, batch_size: usize) -> Self
{
+ pub fn new(
+ filename: &str,
+ schema: Arc<Schema>,
+ has_header: bool,
+ projection: &Option<Vec<usize>>,
+ batch_size: usize,
+ ) -> Self {
let file = File::open(filename).unwrap();
- let reader = csv::Reader::new(file, schema.clone(), true, batch_size,
None);
- Self { schema, reader }
+ let reader = csv::Reader::new(
+ file,
+ schema.clone(),
+ has_header,
+ batch_size,
+ projection.clone(),
+ );
+
+ let projected_schema = match projection {
+ Some(p) => {
+ let projected_fields: Vec<Field> =
+ p.iter().map(|i| schema.fields()[*i].clone()).collect();
+
+ Arc::new(Schema::new(projected_fields))
+ }
+ None => schema,
+ };
+
+ Self {
+ schema: projected_schema,
+ reader,
+ }
}
}
@@ -56,19 +84,48 @@ impl DataSource for CsvDataSource {
}
}
-#[derive(Serialize, Deserialize, Clone)]
-pub enum DataSourceMeta {
- /// Represents a CSV file with a provided schema
- CsvFile {
- filename: String,
- schema: Rc<Schema>,
- has_header: bool,
- projection: Option<Vec<usize>>,
- },
- /// Represents a Parquet file that contains schema information
- ParquetFile {
- filename: String,
- schema: Rc<Schema>,
- projection: Option<Vec<usize>>,
- },
+pub trait DataSourceProvider {
+ fn schema(&self) -> &Arc<Schema>;
+ fn scan(
+ &self,
+ projection: &Option<Vec<usize>>,
+ batch_size: usize,
+ ) -> Rc<RefCell<DataSource>>;
+}
+
+/// Represents a CSV file with a provided schema
+pub struct CsvProvider {
+ filename: String,
+ schema: Arc<Schema>,
+ has_header: bool,
+}
+
+impl CsvProvider {
+ pub fn new(filename: &str, schema: &Schema, has_header: bool) -> Self {
+ Self {
+ filename: String::from(filename),
+ schema: Arc::new(schema.clone()),
+ has_header,
+ }
+ }
+}
+
+impl DataSourceProvider for CsvProvider {
+ fn schema(&self) -> &Arc<Schema> {
+ &self.schema
+ }
+
+ fn scan(
+ &self,
+ projection: &Option<Vec<usize>>,
+ batch_size: usize,
+ ) -> Rc<RefCell<DataSource>> {
+ Rc::new(RefCell::new(CsvDataSource::new(
+ &self.filename,
+ self.schema.clone(),
+ self.has_header,
+ projection,
+ batch_size,
+ )))
+ }
}
diff --git a/rust/datafusion/src/execution/projection.rs
b/rust/datafusion/src/execution/projection.rs
index 6de5818..d9b4a94 100644
--- a/rust/datafusion/src/execution/projection.rs
+++ b/rust/datafusion/src/execution/projection.rs
@@ -104,9 +104,12 @@ mod tests {
Field::new("c11", DataType::Float64, false),
Field::new("c12", DataType::Utf8, false),
]));
+
let ds = CsvDataSource::new(
"../../testing/data/csv/aggregate_test_100.csv",
schema.clone(),
+ true,
+ &None,
1024,
);
let relation = Rc::new(RefCell::new(DataSourceRelation::new(Rc::new(
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 6f96980..954b2ee 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -26,9 +26,10 @@ use arrow::array::*;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::execution::context::ExecutionContext;
-use datafusion::execution::datasource::CsvDataSource;
use datafusion::execution::relation::Relation;
+const DEFAULT_BATCH_SIZE: usize = 1024 * 1024;
+
#[test]
fn csv_query_with_predicate() {
let mut ctx = ExecutionContext::new();
@@ -159,13 +160,12 @@ fn register_csv(
filename: &str,
schema: &Arc<Schema>,
) {
- let csv_datasource = CsvDataSource::new(filename, schema.clone(), 1024);
- ctx.register_datasource(name, Rc::new(RefCell::new(csv_datasource)));
+ ctx.register_csv(name, filename, &schema, true);
}
/// Execute query and return result set as tab delimited string
fn execute(ctx: &mut ExecutionContext, sql: &str) -> String {
- let results = ctx.sql(&sql).unwrap();
+ let results = ctx.sql(&sql, DEFAULT_BATCH_SIZE).unwrap();
result_str(&results)
}