This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 11ae07b ARROW-4681: [Rust] [DataFusion] Partition aware data sources
11ae07b is described below
commit 11ae07bc4f5e3167d3135329330f70b37500e9c0
Author: Andy Grove <[email protected]>
AuthorDate: Sun Mar 10 11:01:24 2019 -0600
ARROW-4681: [Rust] [DataFusion] Partition aware data sources
This PR changes the data source API to support data sources that have
partitions. The `scan` method now returns a `Vec<>` of iterators.
For now the query execution only works for single partitions.
Author: Andy Grove <[email protected]>
Closes #3849 from andygrove/ARROW-4681 and squashes the following commits:
8b6d591 <Andy Grove> Execution engine throws error if partition count is
not one
758d767 <Andy Grove> First pass at partition aware data sources
---
rust/datafusion/src/datasource/csv.rs | 10 ++++------
rust/datafusion/src/datasource/datasource.rs | 10 ++++------
rust/datafusion/src/datasource/memory.rs | 29 ++++++++++++++--------------
rust/datafusion/src/execution/aggregate.rs | 7 ++++---
rust/datafusion/src/execution/context.rs | 10 +++++++++-
rust/datafusion/src/execution/projection.rs | 5 +++--
rust/datafusion/src/execution/relation.rs | 12 +++++-------
7 files changed, 43 insertions(+), 40 deletions(-)
diff --git a/rust/datafusion/src/datasource/csv.rs
b/rust/datafusion/src/datasource/csv.rs
index 1d0e4d7..a010526 100644
--- a/rust/datafusion/src/datasource/csv.rs
+++ b/rust/datafusion/src/datasource/csv.rs
@@ -17,11 +17,9 @@
//! CSV Data source
-use std::cell::RefCell;
use std::fs::File;
-use std::rc::Rc;
use std::string::String;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use arrow::csv;
use arrow::datatypes::{Field, Schema};
@@ -56,14 +54,14 @@ impl Table for CsvFile {
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
- ) -> Result<ScanResult> {
- Ok(Rc::new(RefCell::new(CsvBatchIterator::new(
+ ) -> Result<Vec<ScanResult>> {
+ Ok(vec![Arc::new(Mutex::new(CsvBatchIterator::new(
&self.filename,
self.schema.clone(),
self.has_header,
projection,
batch_size,
- ))))
+ )))])
}
}
diff --git a/rust/datafusion/src/datasource/datasource.rs
b/rust/datafusion/src/datasource/datasource.rs
index 2b6d4e7..7e48f19 100644
--- a/rust/datafusion/src/datasource/datasource.rs
+++ b/rust/datafusion/src/datasource/datasource.rs
@@ -17,28 +17,26 @@
//! Data source traits
-use std::cell::RefCell;
-use std::rc::Rc;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use crate::execution::error::Result;
-pub type ScanResult = Rc<RefCell<RecordBatchIterator>>;
+pub type ScanResult = Arc<Mutex<RecordBatchIterator>>;
/// Source table
pub trait Table {
/// Get a reference to the schema for this table
fn schema(&self) -> &Arc<Schema>;
- /// Perform a scan of a table and return an iterator over the data
+ /// Perform a scan of a table and return a sequence of iterators over the
data (one iterator per partition)
fn scan(
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
- ) -> Result<ScanResult>;
+ ) -> Result<Vec<ScanResult>>;
}
/// Iterator for reading a series of record batches with a known schema
diff --git a/rust/datafusion/src/datasource/memory.rs
b/rust/datafusion/src/datasource/memory.rs
index 5168ae9..ebebd4e 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -19,9 +19,7 @@
//! queried by DataFusion. This allows data to be pre-loaded into memory and
then repeatedly
//! queried without incurring additional file I/O overhead.
-use std::cell::RefCell;
-use std::rc::Rc;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
@@ -53,14 +51,15 @@ impl MemTable {
/// Create a mem table by reading from another data source
pub fn load(t: &Table) -> Result<Self> {
let schema = t.schema();
- let it = t.scan(&None, 1024 * 1024)?;
- let mut it_mut = it.borrow_mut();
+ let partitions = t.scan(&None, 1024 * 1024)?;
let mut data: Vec<RecordBatch> = vec![];
-
- while let Ok(Some(batch)) = it_mut.next() {
- data.push(batch);
+ for it in &partitions {
+ while let Ok(Some(batch)) = it.lock().unwrap().next() {
+ data.push(batch);
+ }
}
+
MemTable::new(schema.clone(), data)
}
}
@@ -74,7 +73,7 @@ impl Table for MemTable {
&self,
projection: &Option<Vec<usize>>,
_batch_size: usize,
- ) -> Result<ScanResult> {
+ ) -> Result<Vec<ScanResult>> {
let columns: Vec<usize> = match projection {
Some(p) => p.clone(),
None => {
@@ -114,11 +113,11 @@ impl Table for MemTable {
.collect();
match batches {
- Ok(batches) => Ok(Rc::new(RefCell::new(MemBatchIterator {
+ Ok(batches) => Ok(vec![Arc::new(Mutex::new(MemBatchIterator {
schema: projected_schema.clone(),
index: 0,
batches,
- }))),
+ }))]),
Err(e) => Err(ExecutionError::ArrowError(e)),
}
}
@@ -173,8 +172,8 @@ mod tests {
let provider = MemTable::new(schema, vec![batch]).unwrap();
// scan with projection
- let scan2 = provider.scan(&Some(vec![2, 1]), 1024).unwrap();
- let batch2 = scan2.borrow_mut().next().unwrap().unwrap();
+ let partitions = provider.scan(&Some(vec![2, 1]), 1024).unwrap();
+ let batch2 = partitions[0].lock().unwrap().next().unwrap().unwrap();
assert_eq!(2, batch2.schema().fields().len());
assert_eq!("c", batch2.schema().field(0).name());
assert_eq!("b", batch2.schema().field(1).name());
@@ -201,8 +200,8 @@ mod tests {
let provider = MemTable::new(schema, vec![batch]).unwrap();
- let scan1 = provider.scan(&None, 1024).unwrap();
- let batch1 = scan1.borrow_mut().next().unwrap().unwrap();
+ let partitions = provider.scan(&None, 1024).unwrap();
+ let batch1 = partitions[0].lock().unwrap().next().unwrap().unwrap();
assert_eq!(3, batch1.schema().fields().len());
assert_eq!(3, batch1.num_columns());
}
diff --git a/rust/datafusion/src/execution/aggregate.rs
b/rust/datafusion/src/execution/aggregate.rs
index f9eb3e2..befafe5 100644
--- a/rust/datafusion/src/execution/aggregate.rs
+++ b/rust/datafusion/src/execution/aggregate.rs
@@ -1027,6 +1027,7 @@ mod tests {
use crate::execution::relation::DataSourceRelation;
use crate::logicalplan::Expr;
use arrow::datatypes::{DataType, Field, Schema};
+ use std::sync::Mutex;
#[test]
fn min_f64_group_by_string() {
@@ -1215,9 +1216,9 @@ mod tests {
fn load_csv(filename: &str, schema: &Arc<Schema>) -> Rc<RefCell<Relation>>
{
let ds = CsvBatchIterator::new(filename, schema.clone(), true, &None,
1024);
- Rc::new(RefCell::new(DataSourceRelation::new(Rc::new(
- RefCell::new(ds),
- ))))
+ Rc::new(RefCell::new(DataSourceRelation::new(Arc::new(Mutex::new(
+ ds,
+ )))))
}
}
diff --git a/rust/datafusion/src/execution/context.rs
b/rust/datafusion/src/execution/context.rs
index 8d4a984..b539084 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -121,7 +121,15 @@ impl ExecutionContext {
} => match self.datasources.borrow().get(table_name) {
Some(provider) => {
let ds = provider.scan(projection, batch_size)?;
- Ok(Rc::new(RefCell::new(DataSourceRelation::new(ds))))
+ if ds.len() == 1 {
+ Ok(Rc::new(RefCell::new(DataSourceRelation::new(
+ ds[0].clone(),
+ ))))
+ } else {
+ Err(ExecutionError::General(
+ "Execution engine only supports single
partition".to_string(),
+ ))
+ }
}
_ => Err(ExecutionError::General(format!(
"No table registered as '{}'",
diff --git a/rust/datafusion/src/execution/projection.rs
b/rust/datafusion/src/execution/projection.rs
index a02213a..fd15715 100644
--- a/rust/datafusion/src/execution/projection.rs
+++ b/rust/datafusion/src/execution/projection.rs
@@ -86,6 +86,7 @@ mod tests {
use crate::execution::relation::DataSourceRelation;
use crate::logicalplan::Expr;
use arrow::datatypes::{DataType, Field, Schema};
+ use std::sync::Mutex;
#[test]
fn project_first_column() {
@@ -112,8 +113,8 @@ mod tests {
&None,
1024,
);
- let relation = Rc::new(RefCell::new(DataSourceRelation::new(Rc::new(
- RefCell::new(ds),
+ let relation = Rc::new(RefCell::new(DataSourceRelation::new(Arc::new(
+ Mutex::new(ds),
))));
let context = ExecutionContext::new();
diff --git a/rust/datafusion/src/execution/relation.rs
b/rust/datafusion/src/execution/relation.rs
index b7f77c1..0011863 100644
--- a/rust/datafusion/src/execution/relation.rs
+++ b/rust/datafusion/src/execution/relation.rs
@@ -15,9 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use std::cell::RefCell;
-use std::rc::Rc;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
@@ -36,19 +34,19 @@ pub trait Relation {
pub struct DataSourceRelation {
schema: Arc<Schema>,
- ds: Rc<RefCell<RecordBatchIterator>>,
+ ds: Arc<Mutex<RecordBatchIterator>>,
}
impl DataSourceRelation {
- pub fn new(ds: Rc<RefCell<RecordBatchIterator>>) -> Self {
- let schema = ds.borrow().schema().clone();
+ pub fn new(ds: Arc<Mutex<RecordBatchIterator>>) -> Self {
+ let schema = ds.lock().unwrap().schema().clone();
Self { ds, schema }
}
}
impl Relation for DataSourceRelation {
fn next(&mut self) -> Result<Option<RecordBatch>> {
- self.ds.borrow_mut().next()
+ self.ds.lock().unwrap().next()
}
fn schema(&self) -> &Arc<Schema> {