This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 082f477440527ec32f7798387e054bade6c39be6
Author: Andy Grove <andygrov...@gmail.com>
AuthorDate: Thu Oct 3 14:43:58 2019 -0600
ARROW-6091: [Rust] [DataFusion] Implement physical execution plan for LIMIT
Closes #5509 from andygrove/ARROW-6091 and squashes the following commits:
cb5c62298 <Andy Grove> bump nightly version
00078c782 <Andy Grove> minor optimization
0c916028f <Andy Grove> test passes
40e617f28 <Andy Grove> unit test
3cb4cab57 <Andy Grove> start roughing out LIMIT
Authored-by: Andy Grove <andygrov...@gmail.com>
Signed-off-by: Andy Grove <andygrov...@gmail.com>
---
rust/datafusion/src/execution/limit.rs | 18 +-
.../src/execution/physical_plan/limit.rs | 208 +++++++++++++++++++++
rust/datafusion/src/execution/physical_plan/mod.rs | 1 +
3 files changed, 213 insertions(+), 14 deletions(-)
diff --git a/rust/datafusion/src/execution/limit.rs
b/rust/datafusion/src/execution/limit.rs
index 11c9a2d..84b3ae8 100644
--- a/rust/datafusion/src/execution/limit.rs
+++ b/rust/datafusion/src/execution/limit.rs
@@ -21,12 +21,11 @@ use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
-use arrow::array::*;
-use arrow::compute::array_ops::limit;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
-use crate::error::{ExecutionError, Result};
+use crate::error::Result;
+use crate::execution::physical_plan::limit::truncate_batch;
use crate::execution::relation::Relation;
/// Implementation of a LIMIT relation
@@ -67,18 +66,9 @@ impl Relation for LimitRelation {
return Ok(None);
}
- if batch.num_rows() >= capacity {
- let limited_columns: Result<Vec<ArrayRef>> =
(0..batch.num_columns())
- .map(|i| match limit(batch.column(i), capacity) {
- Ok(result) => Ok(result),
- Err(error) => Err(ExecutionError::from(error)),
- })
- .collect();
-
- let limited_batch: RecordBatch =
- RecordBatch::try_new(self.schema.clone(),
limited_columns?)?;
+ if batch.num_rows() > capacity {
+ let limited_batch = truncate_batch(&batch, capacity)?;
self.num_consumed_rows += capacity;
-
Ok(Some(limited_batch))
} else {
self.num_consumed_rows += batch.num_rows();
diff --git a/rust/datafusion/src/execution/physical_plan/limit.rs
b/rust/datafusion/src/execution/physical_plan/limit.rs
new file mode 100644
index 0000000..87e77f9
--- /dev/null
+++ b/rust/datafusion/src/execution/physical_plan/limit.rs
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the LIMIT plan
+
+use crate::error::{ExecutionError, Result};
+use crate::execution::physical_plan::common::RecordBatchIterator;
+use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::{BatchIterator, Partition};
+use arrow::array::ArrayRef;
+use arrow::compute::array_ops::limit;
+use arrow::datatypes::Schema;
+use arrow::record_batch::RecordBatch;
+use std::sync::{Arc, Mutex};
+use std::thread;
+use std::thread::JoinHandle;
+
+/// Limit execution plan
+pub struct LimitExec {
+ /// Input schema
+ schema: Arc<Schema>,
+ /// Input partitions
+ partitions: Vec<Arc<dyn Partition>>,
+ /// Maximum number of rows to return
+ limit: usize,
+}
+
+impl LimitExec {
+ /// Create a new MergeExec
+ pub fn new(
+ schema: Arc<Schema>,
+ partitions: Vec<Arc<dyn Partition>>,
+ limit: usize,
+ ) -> Self {
+ LimitExec {
+ schema,
+ partitions,
+ limit,
+ }
+ }
+}
+
+impl ExecutionPlan for LimitExec {
+ fn schema(&self) -> Arc<Schema> {
+ self.schema.clone()
+ }
+
+ fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
+ Ok(vec![Arc::new(LimitPartition {
+ schema: self.schema.clone(),
+ partitions: self.partitions.clone(),
+ limit: self.limit,
+ })])
+ }
+}
+
+struct LimitPartition {
+ /// Input schema
+ schema: Arc<Schema>,
+ /// Input partitions
+ partitions: Vec<Arc<dyn Partition>>,
+ /// Maximum number of rows to return
+ limit: usize,
+}
+
+impl Partition for LimitPartition {
+ fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>> {
+ // collect up to "limit" rows on each partition
+ let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = self
+ .partitions
+ .iter()
+ .map(|p| {
+ let p = p.clone();
+ let limit = self.limit;
+ thread::spawn(move || {
+ let it = p.execute()?;
+ collect_with_limit(it, limit)
+ })
+ })
+ .collect();
+
+ // combine the results from each thread, up to the limit
+ let mut combined_results: Vec<Arc<RecordBatch>> = vec![];
+ let mut count = 0;
+ for thread in threads {
+ let join = thread.join().expect("Failed to join thread");
+ let result = join?;
+ for batch in result {
+ let capacity = self.limit - count;
+ if batch.num_rows() <= capacity {
+ count += batch.num_rows();
+ combined_results.push(Arc::new(batch.clone()))
+ } else {
+ let batch = truncate_batch(&batch, capacity)?;
+ count += batch.num_rows();
+ combined_results.push(Arc::new(batch.clone()))
+ }
+ if count == self.limit {
+ break;
+ }
+ }
+ }
+
+ Ok(Arc::new(Mutex::new(RecordBatchIterator::new(
+ self.schema.clone(),
+ combined_results,
+ ))))
+ }
+}
+
+/// Truncate a RecordBatch to maximum of n rows
+pub fn truncate_batch(batch: &RecordBatch, n: usize) -> Result<RecordBatch> {
+ let limited_columns: Result<Vec<ArrayRef>> = (0..batch.num_columns())
+ .map(|i| match limit(batch.column(i), n) {
+ Ok(result) => Ok(result),
+ Err(error) => Err(ExecutionError::from(error)),
+ })
+ .collect();
+
+ Ok(RecordBatch::try_new(
+ batch.schema().clone(),
+ limited_columns?,
+ )?)
+}
+
+/// Create a vector of record batches from an iterator
+fn collect_with_limit(
+ it: Arc<Mutex<dyn BatchIterator>>,
+ limit: usize,
+) -> Result<Vec<RecordBatch>> {
+ let mut count = 0;
+ let mut it = it.lock().unwrap();
+ let mut results: Vec<RecordBatch> = vec![];
+ loop {
+ match it.next() {
+ Ok(Some(batch)) => {
+ let capacity = limit - count;
+ if batch.num_rows() <= capacity {
+ count += batch.num_rows();
+ results.push(batch);
+ } else {
+ let batch = truncate_batch(&batch, capacity)?;
+ count += batch.num_rows();
+ results.push(batch);
+ }
+ if count == limit {
+ return Ok(results);
+ }
+ }
+ Ok(None) => {
+ // end of result set
+ return Ok(results);
+ }
+ Err(e) => return Err(e),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+
+ use super::*;
+ use crate::execution::physical_plan::common;
+ use crate::execution::physical_plan::csv::CsvExec;
+ use crate::test;
+
+ #[test]
+ fn limit() -> Result<()> {
+ let schema = test::aggr_test_schema();
+
+ let num_partitions = 4;
+ let path =
+ test::create_partitioned_csv("aggregate_test_100.csv",
num_partitions)?;
+
+ let csv = CsvExec::try_new(&path, schema.clone(), true, None, 1024)?;
+
+ // input should have 4 partitions
+ let input = csv.partitions()?;
+ assert_eq!(input.len(), num_partitions);
+
+ let limit = LimitExec::new(schema.clone(), input, 7);
+ let partitions = limit.partitions()?;
+
+ // the result should contain 4 batches (one per input partition)
+ let iter = partitions[0].execute()?;
+ let batches = common::collect(iter)?;
+
+ // there should be a total of 100 rows
+ let row_count: usize = batches.iter().map(|batch|
batch.num_rows()).sum();
+ assert_eq!(row_count, 7);
+
+ Ok(())
+ }
+}
diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs
b/rust/datafusion/src/execution/physical_plan/mod.rs
index f0c34c2..b820af8 100644
--- a/rust/datafusion/src/execution/physical_plan/mod.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -86,6 +86,7 @@ pub mod csv;
pub mod datasource;
pub mod expressions;
pub mod hash_aggregate;
+pub mod limit;
pub mod merge;
pub mod projection;
pub mod selection;