This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 082f477440527ec32f7798387e054bade6c39be6
Author: Andy Grove <andygrov...@gmail.com>
AuthorDate: Thu Oct 3 14:43:58 2019 -0600

    ARROW-6091: [Rust] [DataFusion] Implement physical execution plan for LIMIT
    
    Closes #5509 from andygrove/ARROW-6091 and squashes the following commits:
    
    cb5c62298 <Andy Grove> bump nightly version
    00078c782 <Andy Grove> minor optimization
    0c916028f <Andy Grove> test passes
    40e617f28 <Andy Grove> unit test
    3cb4cab57 <Andy Grove> start roughing out LIMIT
    
    Authored-by: Andy Grove <andygrov...@gmail.com>
    Signed-off-by: Andy Grove <andygrov...@gmail.com>
---
 rust/datafusion/src/execution/limit.rs             |  18 +-
 .../src/execution/physical_plan/limit.rs           | 208 +++++++++++++++++++++
 rust/datafusion/src/execution/physical_plan/mod.rs |   1 +
 3 files changed, 213 insertions(+), 14 deletions(-)

diff --git a/rust/datafusion/src/execution/limit.rs 
b/rust/datafusion/src/execution/limit.rs
index 11c9a2d..84b3ae8 100644
--- a/rust/datafusion/src/execution/limit.rs
+++ b/rust/datafusion/src/execution/limit.rs
@@ -21,12 +21,11 @@ use std::cell::RefCell;
 use std::rc::Rc;
 use std::sync::Arc;
 
-use arrow::array::*;
-use arrow::compute::array_ops::limit;
 use arrow::datatypes::Schema;
 use arrow::record_batch::RecordBatch;
 
-use crate::error::{ExecutionError, Result};
+use crate::error::Result;
+use crate::execution::physical_plan::limit::truncate_batch;
 use crate::execution::relation::Relation;
 
 /// Implementation of a LIMIT relation
@@ -67,18 +66,9 @@ impl Relation for LimitRelation {
                     return Ok(None);
                 }
 
-                if batch.num_rows() >= capacity {
-                    let limited_columns: Result<Vec<ArrayRef>> = 
(0..batch.num_columns())
-                        .map(|i| match limit(batch.column(i), capacity) {
-                            Ok(result) => Ok(result),
-                            Err(error) => Err(ExecutionError::from(error)),
-                        })
-                        .collect();
-
-                    let limited_batch: RecordBatch =
-                        RecordBatch::try_new(self.schema.clone(), 
limited_columns?)?;
+                if batch.num_rows() > capacity {
+                    let limited_batch = truncate_batch(&batch, capacity)?;
                     self.num_consumed_rows += capacity;
-
                     Ok(Some(limited_batch))
                 } else {
                     self.num_consumed_rows += batch.num_rows();
diff --git a/rust/datafusion/src/execution/physical_plan/limit.rs 
b/rust/datafusion/src/execution/physical_plan/limit.rs
new file mode 100644
index 0000000..87e77f9
--- /dev/null
+++ b/rust/datafusion/src/execution/physical_plan/limit.rs
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the LIMIT plan
+
+use crate::error::{ExecutionError, Result};
+use crate::execution::physical_plan::common::RecordBatchIterator;
+use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::{BatchIterator, Partition};
+use arrow::array::ArrayRef;
+use arrow::compute::array_ops::limit;
+use arrow::datatypes::Schema;
+use arrow::record_batch::RecordBatch;
+use std::sync::{Arc, Mutex};
+use std::thread;
+use std::thread::JoinHandle;
+
+/// Limit execution plan
+pub struct LimitExec {
+    /// Input schema
+    schema: Arc<Schema>,
+    /// Input partitions
+    partitions: Vec<Arc<dyn Partition>>,
+    /// Maximum number of rows to return
+    limit: usize,
+}
+
+impl LimitExec {
+    /// Create a new MergeExec
+    pub fn new(
+        schema: Arc<Schema>,
+        partitions: Vec<Arc<dyn Partition>>,
+        limit: usize,
+    ) -> Self {
+        LimitExec {
+            schema,
+            partitions,
+            limit,
+        }
+    }
+}
+
+impl ExecutionPlan for LimitExec {
+    fn schema(&self) -> Arc<Schema> {
+        self.schema.clone()
+    }
+
+    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
+        Ok(vec![Arc::new(LimitPartition {
+            schema: self.schema.clone(),
+            partitions: self.partitions.clone(),
+            limit: self.limit,
+        })])
+    }
+}
+
+struct LimitPartition {
+    /// Input schema
+    schema: Arc<Schema>,
+    /// Input partitions
+    partitions: Vec<Arc<dyn Partition>>,
+    /// Maximum number of rows to return
+    limit: usize,
+}
+
+impl Partition for LimitPartition {
+    fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>> {
+        // collect up to "limit" rows on each partition
+        let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = self
+            .partitions
+            .iter()
+            .map(|p| {
+                let p = p.clone();
+                let limit = self.limit;
+                thread::spawn(move || {
+                    let it = p.execute()?;
+                    collect_with_limit(it, limit)
+                })
+            })
+            .collect();
+
+        // combine the results from each thread, up to the limit
+        let mut combined_results: Vec<Arc<RecordBatch>> = vec![];
+        let mut count = 0;
+        for thread in threads {
+            let join = thread.join().expect("Failed to join thread");
+            let result = join?;
+            for batch in result {
+                let capacity = self.limit - count;
+                if batch.num_rows() <= capacity {
+                    count += batch.num_rows();
+                    combined_results.push(Arc::new(batch.clone()))
+                } else {
+                    let batch = truncate_batch(&batch, capacity)?;
+                    count += batch.num_rows();
+                    combined_results.push(Arc::new(batch.clone()))
+                }
+                if count == self.limit {
+                    break;
+                }
+            }
+        }
+
+        Ok(Arc::new(Mutex::new(RecordBatchIterator::new(
+            self.schema.clone(),
+            combined_results,
+        ))))
+    }
+}
+
+/// Truncate a RecordBatch to maximum of n rows
+pub fn truncate_batch(batch: &RecordBatch, n: usize) -> Result<RecordBatch> {
+    let limited_columns: Result<Vec<ArrayRef>> = (0..batch.num_columns())
+        .map(|i| match limit(batch.column(i), n) {
+            Ok(result) => Ok(result),
+            Err(error) => Err(ExecutionError::from(error)),
+        })
+        .collect();
+
+    Ok(RecordBatch::try_new(
+        batch.schema().clone(),
+        limited_columns?,
+    )?)
+}
+
+/// Create a vector of record batches from an iterator
+fn collect_with_limit(
+    it: Arc<Mutex<dyn BatchIterator>>,
+    limit: usize,
+) -> Result<Vec<RecordBatch>> {
+    let mut count = 0;
+    let mut it = it.lock().unwrap();
+    let mut results: Vec<RecordBatch> = vec![];
+    loop {
+        match it.next() {
+            Ok(Some(batch)) => {
+                let capacity = limit - count;
+                if batch.num_rows() <= capacity {
+                    count += batch.num_rows();
+                    results.push(batch);
+                } else {
+                    let batch = truncate_batch(&batch, capacity)?;
+                    count += batch.num_rows();
+                    results.push(batch);
+                }
+                if count == limit {
+                    return Ok(results);
+                }
+            }
+            Ok(None) => {
+                // end of result set
+                return Ok(results);
+            }
+            Err(e) => return Err(e),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    use super::*;
+    use crate::execution::physical_plan::common;
+    use crate::execution::physical_plan::csv::CsvExec;
+    use crate::test;
+
+    #[test]
+    fn limit() -> Result<()> {
+        let schema = test::aggr_test_schema();
+
+        let num_partitions = 4;
+        let path =
+            test::create_partitioned_csv("aggregate_test_100.csv", 
num_partitions)?;
+
+        let csv = CsvExec::try_new(&path, schema.clone(), true, None, 1024)?;
+
+        // input should have 4 partitions
+        let input = csv.partitions()?;
+        assert_eq!(input.len(), num_partitions);
+
+        let limit = LimitExec::new(schema.clone(), input, 7);
+        let partitions = limit.partitions()?;
+
+        // the result should contain 4 batches (one per input partition)
+        let iter = partitions[0].execute()?;
+        let batches = common::collect(iter)?;
+
+        // there should be a total of 100 rows
+        let row_count: usize = batches.iter().map(|batch| 
batch.num_rows()).sum();
+        assert_eq!(row_count, 7);
+
+        Ok(())
+    }
+}
diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs 
b/rust/datafusion/src/execution/physical_plan/mod.rs
index f0c34c2..b820af8 100644
--- a/rust/datafusion/src/execution/physical_plan/mod.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -86,6 +86,7 @@ pub mod csv;
 pub mod datasource;
 pub mod expressions;
 pub mod hash_aggregate;
+pub mod limit;
 pub mod merge;
 pub mod projection;
 pub mod selection;

Reply via email to