This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 11ae07b  ARROW-4681: [Rust] [DataFusion] Partition aware data sources
11ae07b is described below

commit 11ae07bc4f5e3167d3135329330f70b37500e9c0
Author: Andy Grove <[email protected]>
AuthorDate: Sun Mar 10 11:01:24 2019 -0600

    ARROW-4681: [Rust] [DataFusion] Partition aware data sources
    
    This PR changes the data source API to support data sources that have 
partitions. The `scan` method now returns a `Vec<>` of iterators.
    
    For now the query execution only works for single partitions.
    
    Author: Andy Grove <[email protected]>
    
    Closes #3849 from andygrove/ARROW-4681 and squashes the following commits:
    
    8b6d591 <Andy Grove> Execution engine throws error if partition count is 
not one
    758d767 <Andy Grove> First pass at partition aware data sources
---
 rust/datafusion/src/datasource/csv.rs        | 10 ++++------
 rust/datafusion/src/datasource/datasource.rs | 10 ++++------
 rust/datafusion/src/datasource/memory.rs     | 29 ++++++++++++++--------------
 rust/datafusion/src/execution/aggregate.rs   |  7 ++++---
 rust/datafusion/src/execution/context.rs     | 10 +++++++++-
 rust/datafusion/src/execution/projection.rs  |  5 +++--
 rust/datafusion/src/execution/relation.rs    | 12 +++++-------
 7 files changed, 43 insertions(+), 40 deletions(-)

diff --git a/rust/datafusion/src/datasource/csv.rs 
b/rust/datafusion/src/datasource/csv.rs
index 1d0e4d7..a010526 100644
--- a/rust/datafusion/src/datasource/csv.rs
+++ b/rust/datafusion/src/datasource/csv.rs
@@ -17,11 +17,9 @@
 
 //! CSV Data source
 
-use std::cell::RefCell;
 use std::fs::File;
-use std::rc::Rc;
 use std::string::String;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 
 use arrow::csv;
 use arrow::datatypes::{Field, Schema};
@@ -56,14 +54,14 @@ impl Table for CsvFile {
         &self,
         projection: &Option<Vec<usize>>,
         batch_size: usize,
-    ) -> Result<ScanResult> {
-        Ok(Rc::new(RefCell::new(CsvBatchIterator::new(
+    ) -> Result<Vec<ScanResult>> {
+        Ok(vec![Arc::new(Mutex::new(CsvBatchIterator::new(
             &self.filename,
             self.schema.clone(),
             self.has_header,
             projection,
             batch_size,
-        ))))
+        )))])
     }
 }
 
diff --git a/rust/datafusion/src/datasource/datasource.rs 
b/rust/datafusion/src/datasource/datasource.rs
index 2b6d4e7..7e48f19 100644
--- a/rust/datafusion/src/datasource/datasource.rs
+++ b/rust/datafusion/src/datasource/datasource.rs
@@ -17,28 +17,26 @@
 
 //! Data source traits
 
-use std::cell::RefCell;
-use std::rc::Rc;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 
 use arrow::datatypes::Schema;
 use arrow::record_batch::RecordBatch;
 
 use crate::execution::error::Result;
 
-pub type ScanResult = Rc<RefCell<RecordBatchIterator>>;
+pub type ScanResult = Arc<Mutex<RecordBatchIterator>>;
 
 /// Source table
 pub trait Table {
     /// Get a reference to the schema for this table
     fn schema(&self) -> &Arc<Schema>;
 
-    /// Perform a scan of a table and return an iterator over the data
+    /// Perform a scan of a table and return a sequence of iterators over the 
data (one iterator per partition)
     fn scan(
         &self,
         projection: &Option<Vec<usize>>,
         batch_size: usize,
-    ) -> Result<ScanResult>;
+    ) -> Result<Vec<ScanResult>>;
 }
 
 /// Iterator for reading a series of record batches with a known schema
diff --git a/rust/datafusion/src/datasource/memory.rs 
b/rust/datafusion/src/datasource/memory.rs
index 5168ae9..ebebd4e 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -19,9 +19,7 @@
 //! queried by DataFusion. This allows data to be pre-loaded into memory and 
then repeatedly
 //! queried without incurring additional file I/O overhead.
 
-use std::cell::RefCell;
-use std::rc::Rc;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 
 use arrow::datatypes::{Field, Schema};
 use arrow::record_batch::RecordBatch;
@@ -53,14 +51,15 @@ impl MemTable {
     /// Create a mem table by reading from another data source
     pub fn load(t: &Table) -> Result<Self> {
         let schema = t.schema();
-        let it = t.scan(&None, 1024 * 1024)?;
-        let mut it_mut = it.borrow_mut();
+        let partitions = t.scan(&None, 1024 * 1024)?;
 
         let mut data: Vec<RecordBatch> = vec![];
-
-        while let Ok(Some(batch)) = it_mut.next() {
-            data.push(batch);
+        for it in &partitions {
+            while let Ok(Some(batch)) = it.lock().unwrap().next() {
+                data.push(batch);
+            }
         }
+
         MemTable::new(schema.clone(), data)
     }
 }
@@ -74,7 +73,7 @@ impl Table for MemTable {
         &self,
         projection: &Option<Vec<usize>>,
         _batch_size: usize,
-    ) -> Result<ScanResult> {
+    ) -> Result<Vec<ScanResult>> {
         let columns: Vec<usize> = match projection {
             Some(p) => p.clone(),
             None => {
@@ -114,11 +113,11 @@ impl Table for MemTable {
             .collect();
 
         match batches {
-            Ok(batches) => Ok(Rc::new(RefCell::new(MemBatchIterator {
+            Ok(batches) => Ok(vec![Arc::new(Mutex::new(MemBatchIterator {
                 schema: projected_schema.clone(),
                 index: 0,
                 batches,
-            }))),
+            }))]),
             Err(e) => Err(ExecutionError::ArrowError(e)),
         }
     }
@@ -173,8 +172,8 @@ mod tests {
         let provider = MemTable::new(schema, vec![batch]).unwrap();
 
         // scan with projection
-        let scan2 = provider.scan(&Some(vec![2, 1]), 1024).unwrap();
-        let batch2 = scan2.borrow_mut().next().unwrap().unwrap();
+        let partitions = provider.scan(&Some(vec![2, 1]), 1024).unwrap();
+        let batch2 = partitions[0].lock().unwrap().next().unwrap().unwrap();
         assert_eq!(2, batch2.schema().fields().len());
         assert_eq!("c", batch2.schema().field(0).name());
         assert_eq!("b", batch2.schema().field(1).name());
@@ -201,8 +200,8 @@ mod tests {
 
         let provider = MemTable::new(schema, vec![batch]).unwrap();
 
-        let scan1 = provider.scan(&None, 1024).unwrap();
-        let batch1 = scan1.borrow_mut().next().unwrap().unwrap();
+        let partitions = provider.scan(&None, 1024).unwrap();
+        let batch1 = partitions[0].lock().unwrap().next().unwrap().unwrap();
         assert_eq!(3, batch1.schema().fields().len());
         assert_eq!(3, batch1.num_columns());
     }
diff --git a/rust/datafusion/src/execution/aggregate.rs 
b/rust/datafusion/src/execution/aggregate.rs
index f9eb3e2..befafe5 100644
--- a/rust/datafusion/src/execution/aggregate.rs
+++ b/rust/datafusion/src/execution/aggregate.rs
@@ -1027,6 +1027,7 @@ mod tests {
     use crate::execution::relation::DataSourceRelation;
     use crate::logicalplan::Expr;
     use arrow::datatypes::{DataType, Field, Schema};
+    use std::sync::Mutex;
 
     #[test]
     fn min_f64_group_by_string() {
@@ -1215,9 +1216,9 @@ mod tests {
 
     fn load_csv(filename: &str, schema: &Arc<Schema>) -> Rc<RefCell<Relation>> 
{
         let ds = CsvBatchIterator::new(filename, schema.clone(), true, &None, 
1024);
-        Rc::new(RefCell::new(DataSourceRelation::new(Rc::new(
-            RefCell::new(ds),
-        ))))
+        Rc::new(RefCell::new(DataSourceRelation::new(Arc::new(Mutex::new(
+            ds,
+        )))))
     }
 
 }
diff --git a/rust/datafusion/src/execution/context.rs 
b/rust/datafusion/src/execution/context.rs
index 8d4a984..b539084 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -121,7 +121,15 @@ impl ExecutionContext {
             } => match self.datasources.borrow().get(table_name) {
                 Some(provider) => {
                     let ds = provider.scan(projection, batch_size)?;
-                    Ok(Rc::new(RefCell::new(DataSourceRelation::new(ds))))
+                    if ds.len() == 1 {
+                        Ok(Rc::new(RefCell::new(DataSourceRelation::new(
+                            ds[0].clone(),
+                        ))))
+                    } else {
+                        Err(ExecutionError::General(
+                            "Execution engine only supports single 
partition".to_string(),
+                        ))
+                    }
                 }
                 _ => Err(ExecutionError::General(format!(
                     "No table registered as '{}'",
diff --git a/rust/datafusion/src/execution/projection.rs 
b/rust/datafusion/src/execution/projection.rs
index a02213a..fd15715 100644
--- a/rust/datafusion/src/execution/projection.rs
+++ b/rust/datafusion/src/execution/projection.rs
@@ -86,6 +86,7 @@ mod tests {
     use crate::execution::relation::DataSourceRelation;
     use crate::logicalplan::Expr;
     use arrow::datatypes::{DataType, Field, Schema};
+    use std::sync::Mutex;
 
     #[test]
     fn project_first_column() {
@@ -112,8 +113,8 @@ mod tests {
             &None,
             1024,
         );
-        let relation = Rc::new(RefCell::new(DataSourceRelation::new(Rc::new(
-            RefCell::new(ds),
+        let relation = Rc::new(RefCell::new(DataSourceRelation::new(Arc::new(
+            Mutex::new(ds),
         ))));
         let context = ExecutionContext::new();
 
diff --git a/rust/datafusion/src/execution/relation.rs 
b/rust/datafusion/src/execution/relation.rs
index b7f77c1..0011863 100644
--- a/rust/datafusion/src/execution/relation.rs
+++ b/rust/datafusion/src/execution/relation.rs
@@ -15,9 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cell::RefCell;
-use std::rc::Rc;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 
 use arrow::datatypes::Schema;
 use arrow::record_batch::RecordBatch;
@@ -36,19 +34,19 @@ pub trait Relation {
 
 pub struct DataSourceRelation {
     schema: Arc<Schema>,
-    ds: Rc<RefCell<RecordBatchIterator>>,
+    ds: Arc<Mutex<RecordBatchIterator>>,
 }
 
 impl DataSourceRelation {
-    pub fn new(ds: Rc<RefCell<RecordBatchIterator>>) -> Self {
-        let schema = ds.borrow().schema().clone();
+    pub fn new(ds: Arc<Mutex<RecordBatchIterator>>) -> Self {
+        let schema = ds.lock().unwrap().schema().clone();
         Self { ds, schema }
     }
 }
 
 impl Relation for DataSourceRelation {
     fn next(&mut self) -> Result<Option<RecordBatch>> {
-        self.ds.borrow_mut().next()
+        self.ds.lock().unwrap().next()
     }
 
     fn schema(&self) -> &Arc<Schema> {

Reply via email to