This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7890913  ARROW-8261: [Rust-DataFusion] Made limit accept integers and 
no longer accept expressions.
7890913 is described below

commit 7890913589d813a8cc86d923d330fb5e8c7dfc19
Author: Jorge C. Leitao <[email protected]>
AuthorDate: Mon Jul 13 08:47:53 2020 -0600

    ARROW-8261: [Rust-DataFusion] Made limit accept integers and no longer 
accept expressions.
    
    Also made the type consistent across the project (=usize).
    
    This change is backward incompatible.
    
    The rational for this change is that limit() is almost never called
    with an expression in real world cases, since it is used to limit
    results before an action.
    
    Closes #7713 from jorgecarleitao/arrow-8261
    
    Authored-by: Jorge C. Leitao <[email protected]>
    Signed-off-by: Andy Grove <[email protected]>
---
 rust/datafusion/src/execution/context.rs           | 41 ++++------------------
 rust/datafusion/src/execution/table_impl.rs        |  8 ++---
 rust/datafusion/src/logicalplan.rs                 | 14 ++++----
 .../src/optimizer/projection_push_down.rs          | 21 ++++-------
 rust/datafusion/src/sql/planner.rs                 |  8 ++---
 rust/datafusion/src/table.rs                       |  2 +-
 6 files changed, 26 insertions(+), 68 deletions(-)

diff --git a/rust/datafusion/src/execution/context.rs 
b/rust/datafusion/src/execution/context.rs
index 1cb3c5f..1974eca 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -423,44 +423,15 @@ impl ExecutionContext {
 
                 Ok(Arc::new(SortExec::try_new(sort_expr, input)?))
             }
-            LogicalPlan::Limit { input, expr, .. } => {
+            LogicalPlan::Limit { input, n, .. } => {
                 let input = self.create_physical_plan(input, batch_size)?;
                 let input_schema = input.as_ref().schema().clone();
 
-                match expr {
-                    &Expr::Literal(ref scalar_value) => {
-                        let limit: usize = match scalar_value {
-                            ScalarValue::Int8(limit) if *limit >= 0 => {
-                                Ok(*limit as usize)
-                            }
-                            ScalarValue::Int16(limit) if *limit >= 0 => {
-                                Ok(*limit as usize)
-                            }
-                            ScalarValue::Int32(limit) if *limit >= 0 => {
-                                Ok(*limit as usize)
-                            }
-                            ScalarValue::Int64(limit) if *limit >= 0 => {
-                                Ok(*limit as usize)
-                            }
-                            ScalarValue::UInt8(limit) => Ok(*limit as usize),
-                            ScalarValue::UInt16(limit) => Ok(*limit as usize),
-                            ScalarValue::UInt32(limit) => Ok(*limit as usize),
-                            ScalarValue::UInt64(limit) => Ok(*limit as usize),
-                            _ => Err(ExecutionError::ExecutionError(
-                                "Limit only supports non-negative integer 
literals"
-                                    .to_string(),
-                            )),
-                        }?;
-                        Ok(Arc::new(LimitExec::new(
-                            input_schema.clone(),
-                            input.partitions()?,
-                            limit,
-                        )))
-                    }
-                    _ => Err(ExecutionError::ExecutionError(
-                        "Limit only supports non-negative integer 
literals".to_string(),
-                    )),
-                }
+                Ok(Arc::new(LimitExec::new(
+                    input_schema.clone(),
+                    input.partitions()?,
+                    *n,
+                )))
             }
             _ => Err(ExecutionError::General(
                 "Unsupported logical plan variant".to_string(),
diff --git a/rust/datafusion/src/execution/table_impl.rs 
b/rust/datafusion/src/execution/table_impl.rs
index facb83f..8f798bd 100644
--- a/rust/datafusion/src/execution/table_impl.rs
+++ b/rust/datafusion/src/execution/table_impl.rs
@@ -23,8 +23,8 @@ use crate::arrow::datatypes::DataType;
 use crate::arrow::record_batch::RecordBatch;
 use crate::error::{ExecutionError, Result};
 use crate::execution::context::ExecutionContext;
+use crate::logicalplan::LogicalPlanBuilder;
 use crate::logicalplan::{Expr, LogicalPlan};
-use crate::logicalplan::{LogicalPlanBuilder, ScalarValue};
 use crate::table::*;
 use arrow::datatypes::Schema;
 
@@ -83,10 +83,8 @@ impl Table for TableImpl {
     }
 
     /// Limit the number of rows
-    fn limit(&self, n: u32) -> Result<Arc<dyn Table>> {
-        let plan = LogicalPlanBuilder::from(&self.plan)
-            .limit(Expr::Literal(ScalarValue::UInt32(n)))?
-            .build()?;
+    fn limit(&self, n: usize) -> Result<Arc<dyn Table>> {
+        let plan = LogicalPlanBuilder::from(&self.plan).limit(n)?.build()?;
         Ok(Arc::new(TableImpl::new(&plan)))
     }
 
diff --git a/rust/datafusion/src/logicalplan.rs 
b/rust/datafusion/src/logicalplan.rs
index 3384b93..fe711ee 100644
--- a/rust/datafusion/src/logicalplan.rs
+++ b/rust/datafusion/src/logicalplan.rs
@@ -586,8 +586,8 @@ pub enum LogicalPlan {
     },
     /// Represents the maximum number of records to return
     Limit {
-        /// The expression
-        expr: Expr,
+        /// The limit
+        n: usize,
         /// The logical plan
         input: Box<LogicalPlan>,
         /// The schema description
@@ -713,11 +713,9 @@ impl LogicalPlan {
                 input.fmt_with_indent(f, indent + 1)
             }
             LogicalPlan::Limit {
-                ref input,
-                ref expr,
-                ..
+                ref input, ref n, ..
             } => {
-                write!(f, "Limit: {:?}", expr)?;
+                write!(f, "Limit: {}", n)?;
                 input.fmt_with_indent(f, indent + 1)
             }
             LogicalPlan::CreateExternalTable { ref name, .. } => {
@@ -916,9 +914,9 @@ impl LogicalPlanBuilder {
     }
 
     /// Apply a limit
-    pub fn limit(&self, expr: Expr) -> Result<Self> {
+    pub fn limit(&self, n: usize) -> Result<Self> {
         Ok(Self::from(&LogicalPlan::Limit {
-            expr,
+            n,
             input: Box::new(self.plan.clone()),
             schema: self.plan.schema().clone(),
         }))
diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs 
b/rust/datafusion/src/optimizer/projection_push_down.rs
index cb2c3fd..8fc203e 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -203,18 +203,11 @@ impl ProjectionPushDown {
                     projected_schema: Box::new(projected_schema),
                 })
             }
-            LogicalPlan::Limit { expr, input, .. } => {
-                // Note that limit expressions are scalar values so there is 
no need to
-                // rewrite them but we do need to optimize the input to the 
limit plan
-                LogicalPlanBuilder::from(&self.optimize_plan(
-                    &input,
-                    accum,
-                    mapping,
-                    has_projection,
-                )?)
-                .limit(expr.clone())?
-                .build()
-            }
+            LogicalPlan::Limit { n, input, .. } => LogicalPlanBuilder::from(
+                &self.optimize_plan(&input, accum, mapping, has_projection)?,
+            )
+            .limit(*n)?
+            .build(),
             LogicalPlan::CreateExternalTable {
                 schema,
                 name,
@@ -477,12 +470,12 @@ mod tests {
 
         let plan = LogicalPlanBuilder::from(&table_scan)
             .project(vec![Column(2), Column(0)])?
-            .limit(Expr::Literal(ScalarValue::UInt32(5)))?
+            .limit(5)?
             .build()?;
 
         assert_fields_eq(&plan, vec!["c", "a"]);
 
-        let expected = "Limit: UInt32(5)\
+        let expected = "Limit: 5\
         \n  Projection: #1, #0\
         \n    TableScan: test projection=Some([0, 2])";
 
diff --git a/rust/datafusion/src/sql/planner.rs 
b/rust/datafusion/src/sql/planner.rs
index 5c95706..d6bd296 100644
--- a/rust/datafusion/src/sql/planner.rs
+++ b/rust/datafusion/src/sql/planner.rs
@@ -215,16 +215,14 @@ impl<S: SchemaProvider> SqlToRel<S> {
     ) -> Result<LogicalPlan> {
         match *limit {
             Some(ref limit_expr) => {
-                let limit_rex = match self.sql_to_rex(&limit_expr, 
&input.schema())? {
-                    Expr::Literal(ScalarValue::Int64(n)) => {
-                        Ok(Expr::Literal(ScalarValue::UInt32(n as u32)))
-                    }
+                let n = match self.sql_to_rex(&limit_expr, &input.schema())? {
+                    Expr::Literal(ScalarValue::Int64(n)) => Ok(n as usize),
                     _ => Err(ExecutionError::General(
                         "Unexpected expression for LIMIT clause".to_string(),
                     )),
                 }?;
 
-                LogicalPlanBuilder::from(&input).limit(limit_rex)?.build()
+                LogicalPlanBuilder::from(&input).limit(n)?.build()
             }
             _ => Ok(input.clone()),
         }
diff --git a/rust/datafusion/src/table.rs b/rust/datafusion/src/table.rs
index 1c1e982..bd04f07 100644
--- a/rust/datafusion/src/table.rs
+++ b/rust/datafusion/src/table.rs
@@ -44,7 +44,7 @@ pub trait Table {
     ) -> Result<Arc<dyn Table>>;
 
     /// limit the number of rows
-    fn limit(&self, n: u32) -> Result<Arc<dyn Table>>;
+    fn limit(&self, n: usize) -> Result<Arc<dyn Table>>;
 
     /// Return the logical plan
     fn to_logical_plan(&self) -> LogicalPlan;

Reply via email to