This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d2cbf42  ARROW-6694: [Rust] [DataFusion] Integration tests now use 
physical query plan
d2cbf42 is described below

commit d2cbf42cc7a62beef55f0db8e4974e255a97b65c
Author: Andy Grove <andygrov...@gmail.com>
AuthorDate: Sun Oct 6 19:16:00 2019 -0600

    ARROW-6694: [Rust] [DataFusion] Integration tests now use physical query 
plan
    
    Integration tests now use physical query plan instead of executing directly 
from the logical plan.
    
    As part of this PR I added the mapping from logical to physical plan for 
LIMIT which I somehow missed when creating the PR for the LIMIT physical plan.
    
    Closes #5582 from andygrove/ARROW-6694 and squashes the following commits:
    
    059f12f9e <Andy Grove> Add check for negative LIMIT when signed int is used
    4b9c670dd <Andy Grove> remove another case of executing the logical plan
    b13058e85 <Andy Grove> integration tests now use physical query plan
    
    Authored-by: Andy Grove <andygrov...@gmail.com>
    Signed-off-by: Andy Grove <andygrov...@gmail.com>
---
 rust/datafusion/src/execution/context.rs | 38 ++++++++++++++++++++++++++++++++
 rust/datafusion/tests/sql.rs             | 20 ++++++++---------
 2 files changed, 48 insertions(+), 10 deletions(-)

diff --git a/rust/datafusion/src/execution/context.rs 
b/rust/datafusion/src/execution/context.rs
index c606d82..1f7b239 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -41,6 +41,7 @@ use crate::execution::physical_plan::expressions::{
     Avg, BinaryExpr, CastExpr, Column, Count, Literal, Max, Min, Sum,
 };
 use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
+use crate::execution::physical_plan::limit::LimitExec;
 use crate::execution::physical_plan::merge::MergeExec;
 use crate::execution::physical_plan::projection::ProjectionExec;
 use crate::execution::physical_plan::selection::SelectionExec;
@@ -289,6 +290,43 @@ impl ExecutionContext {
                 let runtime_expr = self.create_physical_expr(expr, 
&input_schema)?;
                 Ok(Arc::new(SelectionExec::try_new(runtime_expr, input)?))
             }
+            LogicalPlan::Limit { input, expr, .. } => {
+                let input = self.create_physical_plan(input, batch_size)?;
+                let input_schema = input.as_ref().schema().clone();
+
+                match expr {
+                    &Expr::Literal(ref scalar_value) => {
+                        let limit: usize = match scalar_value {
+                            ScalarValue::Int8(limit) if *limit >= 0 => 
Ok(*limit as usize),
+                            ScalarValue::Int16(limit) if *limit >= 0 => {
+                                Ok(*limit as usize)
+                            }
+                            ScalarValue::Int32(limit) if *limit >= 0 => {
+                                Ok(*limit as usize)
+                            }
+                            ScalarValue::Int64(limit) if *limit >= 0 => {
+                                Ok(*limit as usize)
+                            }
+                            ScalarValue::UInt8(limit) => Ok(*limit as usize),
+                            ScalarValue::UInt16(limit) => Ok(*limit as usize),
+                            ScalarValue::UInt32(limit) => Ok(*limit as usize),
+                            ScalarValue::UInt64(limit) => Ok(*limit as usize),
+                            _ => Err(ExecutionError::ExecutionError(
+                                "Limit only supports non-negative integer 
literals"
+                                    .to_string(),
+                            )),
+                        }?;
+                        Ok(Arc::new(LimitExec::new(
+                            input_schema.clone(),
+                            input.partitions()?,
+                            limit,
+                        )))
+                    }
+                    _ => Err(ExecutionError::ExecutionError(
+                        "Limit only supports non-negative integer 
literals".to_string(),
+                    )),
+                }
+            }
             _ => Err(ExecutionError::General(
                 "Unsupported logical plan variant".to_string(),
             )),
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 9c0adc9..2532194 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -15,9 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cell::RefCell;
 use std::env;
-use std::rc::Rc;
 use std::sync::Arc;
 
 extern crate arrow;
@@ -25,10 +23,10 @@ extern crate datafusion;
 
 use arrow::array::*;
 use arrow::datatypes::{DataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
 
 use datafusion::error::Result;
 use datafusion::execution::context::ExecutionContext;
-use datafusion::execution::relation::Relation;
 use datafusion::logicalplan::LogicalPlan;
 
 const DEFAULT_BATCH_SIZE: usize = 1024 * 1024;
@@ -101,9 +99,11 @@ fn parquet_single_nan_schema() {
     ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", 
testdata))
         .unwrap();
     let sql = "SELECT mycol FROM single_nan";
-    let relation = ctx.sql(&sql, 1024 * 1024).unwrap();
-    let mut results = relation.borrow_mut();
-    while let Some(batch) = results.next().unwrap() {
+    let plan = ctx.create_logical_plan(&sql).unwrap();
+    let plan = ctx.optimize(&plan).unwrap();
+    let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap();
+    let results = ctx.collect(plan.as_ref()).unwrap();
+    for batch in results {
         assert_eq!(1, batch.num_rows());
         assert_eq!(1, batch.num_columns());
     }
@@ -430,14 +430,14 @@ fn register_alltypes_parquet(ctx: &mut ExecutionContext) {
 fn execute(ctx: &mut ExecutionContext, sql: &str) -> String {
     let plan = ctx.create_logical_plan(&sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
-    let results = ctx.execute(&plan, DEFAULT_BATCH_SIZE).unwrap();
+    let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap();
+    let results = ctx.collect(plan.as_ref()).unwrap();
     result_str(&results)
 }
 
-fn result_str(results: &Rc<RefCell<dyn Relation>>) -> String {
-    let mut relation = results.borrow_mut();
+fn result_str(results: &Vec<RecordBatch>) -> String {
     let mut str = String::new();
-    while let Some(batch) = relation.next().unwrap() {
+    for batch in results {
         for row_index in 0..batch.num_rows() {
             for column_index in 0..batch.num_columns() {
                 if column_index > 0 {

Reply via email to