This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new d2cbf42 ARROW-6694: [Rust] [DataFusion] Integration tests now use physical query plan d2cbf42 is described below commit d2cbf42cc7a62beef55f0db8e4974e255a97b65c Author: Andy Grove <andygrov...@gmail.com> AuthorDate: Sun Oct 6 19:16:00 2019 -0600 ARROW-6694: [Rust] [DataFusion] Integration tests now use physical query plan Integration tests now use physical query plan instead of executing directly from the logical plan. As part of this PR I added the mapping from logical to physical plan for LIMIT which I somehow missed when creating the PR for the LIMIT physical plan. Closes #5582 from andygrove/ARROW-6694 and squashes the following commits: 059f12f9e <Andy Grove> Add check for negative LIMIT when signed int is used 4b9c670dd <Andy Grove> remove another case of executing the logical plan b13058e85 <Andy Grove> integration tests now use physical query plan Authored-by: Andy Grove <andygrov...@gmail.com> Signed-off-by: Andy Grove <andygrov...@gmail.com> --- rust/datafusion/src/execution/context.rs | 38 ++++++++++++++++++++++++++++++++ rust/datafusion/tests/sql.rs | 20 ++++++++--------- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index c606d82..1f7b239 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -41,6 +41,7 @@ use crate::execution::physical_plan::expressions::{ Avg, BinaryExpr, CastExpr, Column, Count, Literal, Max, Min, Sum, }; use crate::execution::physical_plan::hash_aggregate::HashAggregateExec; +use crate::execution::physical_plan::limit::LimitExec; use crate::execution::physical_plan::merge::MergeExec; use crate::execution::physical_plan::projection::ProjectionExec; use crate::execution::physical_plan::selection::SelectionExec; @@ -289,6 +290,43 @@ impl ExecutionContext { let runtime_expr = self.create_physical_expr(expr, &input_schema)?; Ok(Arc::new(SelectionExec::try_new(runtime_expr, input)?)) } + LogicalPlan::Limit { input, expr, .. } => { + let input = self.create_physical_plan(input, batch_size)?; + let input_schema = input.as_ref().schema().clone(); + + match expr { + &Expr::Literal(ref scalar_value) => { + let limit: usize = match scalar_value { + ScalarValue::Int8(limit) if *limit >= 0 => Ok(*limit as usize), + ScalarValue::Int16(limit) if *limit >= 0 => { + Ok(*limit as usize) + } + ScalarValue::Int32(limit) if *limit >= 0 => { + Ok(*limit as usize) + } + ScalarValue::Int64(limit) if *limit >= 0 => { + Ok(*limit as usize) + } + ScalarValue::UInt8(limit) => Ok(*limit as usize), + ScalarValue::UInt16(limit) => Ok(*limit as usize), + ScalarValue::UInt32(limit) => Ok(*limit as usize), + ScalarValue::UInt64(limit) => Ok(*limit as usize), + _ => Err(ExecutionError::ExecutionError( + "Limit only supports non-negative integer literals" + .to_string(), + )), + }?; + Ok(Arc::new(LimitExec::new( + input_schema.clone(), + input.partitions()?, + limit, + ))) + } + _ => Err(ExecutionError::ExecutionError( + "Limit only supports non-negative integer literals".to_string(), + )), + } + } _ => Err(ExecutionError::General( "Unsupported logical plan variant".to_string(), )), diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 9c0adc9..2532194 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::cell::RefCell; use std::env; -use std::rc::Rc; use std::sync::Arc; extern crate arrow; @@ -25,10 +23,10 @@ extern crate datafusion; use arrow::array::*; use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; use datafusion::error::Result; use datafusion::execution::context::ExecutionContext; -use datafusion::execution::relation::Relation; use datafusion::logicalplan::LogicalPlan; const DEFAULT_BATCH_SIZE: usize = 1024 * 1024; @@ -101,9 +99,11 @@ fn parquet_single_nan_schema() { ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata)) .unwrap(); let sql = "SELECT mycol FROM single_nan"; - let relation = ctx.sql(&sql, 1024 * 1024).unwrap(); - let mut results = relation.borrow_mut(); - while let Some(batch) = results.next().unwrap() { + let plan = ctx.create_logical_plan(&sql).unwrap(); + let plan = ctx.optimize(&plan).unwrap(); + let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap(); + let results = ctx.collect(plan.as_ref()).unwrap(); + for batch in results { assert_eq!(1, batch.num_rows()); assert_eq!(1, batch.num_columns()); } @@ -430,14 +430,14 @@ fn register_alltypes_parquet(ctx: &mut ExecutionContext) { fn execute(ctx: &mut ExecutionContext, sql: &str) -> String { let plan = ctx.create_logical_plan(&sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); - let results = ctx.execute(&plan, DEFAULT_BATCH_SIZE).unwrap(); + let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap(); + let results = ctx.collect(plan.as_ref()).unwrap(); result_str(&results) } -fn result_str(results: &Rc<RefCell<dyn Relation>>) -> String { - let mut relation = results.borrow_mut(); +fn result_str(results: &Vec<RecordBatch>) -> String { let mut str = String::new(); - while let Some(batch) = relation.next().unwrap() { + for batch in results { for row_index in 0..batch.num_rows() { for column_index in 0..batch.num_columns() { if column_index > 0 {