This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b85118  ARROW-2852: [Rust] Make Array sync and send
9b85118 is described below

commit 9b8511817259c60854430dd5465153ccada63d00
Author: liurenjie1024 <[email protected]>
AuthorDate: Tue Jul 24 22:49:01 2018 -0400

    ARROW-2852: [Rust] Make Array sync and send
    
    Mark array as Sync and Send would be useful for multithread processing. 
It's also safe since they have no interior mutability.
    
    Author: liurenjie1024 <[email protected]>
    
    Closes #2269 from liurenjie1024/sync and squashes the following commits:
    
    585bc6e3 <liurenjie1024> Add test for concurrency access.
    59c45b9f <liurenjie1024> Make data sync and send
---
 rust/examples/dynamic_types.rs | 16 ++++++++--------
 rust/src/array.rs              | 29 +++++++++++++++++++++--------
 rust/src/buffer.rs             | 18 ++++++++++++++++++
 rust/src/datatypes.rs          |  2 +-
 rust/src/record_batch.rs       | 17 ++++++++++-------
 5 files changed, 58 insertions(+), 24 deletions(-)

diff --git a/rust/examples/dynamic_types.rs b/rust/examples/dynamic_types.rs
index 86a1890..b8093bf 100644
--- a/rust/examples/dynamic_types.rs
+++ b/rust/examples/dynamic_types.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 ///! This example demonstrates dealing with mixed types dynamically at runtime
-use std::rc::Rc;
+use std::sync::Arc;
 
 extern crate arrow;
 
@@ -43,13 +43,13 @@ fn main() {
     let id = PrimitiveArray::from(vec![1, 2, 3, 4, 5]);
 
     let nested = StructArray::from(vec![
-        Rc::new(ListArray::from(vec!["a", "b", "c", "d", "e"])) as Rc<Array>,
-        Rc::new(PrimitiveArray::from(vec![1.1, 2.2, 3.3, 4.4, 5.5])),
-        Rc::new(PrimitiveArray::from(vec![2.2, 3.3, 4.4, 5.5, 6.6])),
+        Arc::new(ListArray::from(vec!["a", "b", "c", "d", "e"])) as Arc<Array>,
+        Arc::new(PrimitiveArray::from(vec![1.1, 2.2, 3.3, 4.4, 5.5])),
+        Arc::new(PrimitiveArray::from(vec![2.2, 3.3, 4.4, 5.5, 6.6])),
     ]);
 
     // build a record batch
-    let batch = RecordBatch::new(Rc::new(schema), vec![Rc::new(id), 
Rc::new(nested)]);
+    let batch = RecordBatch::new(Arc::new(schema), vec![Arc::new(id), 
Arc::new(nested)]);
 
     process(&batch);
 }
@@ -80,10 +80,10 @@ fn process(batch: &RecordBatch) {
     ]);
 
     let _ = RecordBatch::new(
-        Rc::new(projected_schema),
+        Arc::new(projected_schema),
         vec![
-            id.clone(), //NOTE: this is cloning the Rc not the array data
-            Rc::new(nested_b.add(nested_c)),
+            id.clone(), //NOTE: this is cloning the Arc not the array data
+            Arc::new(nested_b.add(nested_c)),
         ],
     );
 }
diff --git a/rust/src/array.rs b/rust/src/array.rs
index 3fde1c0..e418518 100644
--- a/rust/src/array.rs
+++ b/rust/src/array.rs
@@ -19,7 +19,7 @@
 use std::any::Any;
 use std::convert::From;
 use std::ops::Add;
-use std::rc::Rc;
+use std::sync::Arc;
 use std::str;
 use std::string::String;
 
@@ -32,7 +32,7 @@ use super::list_builder::*;
 
 /// Trait for dealing with different types of Array at runtime when the type 
of the
 /// array is not known in advance
-pub trait Array {
+pub trait Array: Send + Sync {
     /// Returns the length of the array (number of items in the array)
     fn len(&self) -> usize;
     /// Returns the number of null values in the array
@@ -310,7 +310,7 @@ where
 /// An Array of structs
 pub struct StructArray {
     len: usize,
-    columns: Vec<Rc<Array>>,
+    columns: Vec<Arc<Array>>,
     null_count: usize,
     validity_bitmap: Option<Bitmap>,
 }
@@ -319,7 +319,7 @@ impl StructArray {
     pub fn num_columns(&self) -> usize {
         self.columns.len()
     }
-    pub fn column(&self, i: usize) -> &Rc<Array> {
+    pub fn column(&self, i: usize) -> &Arc<Array> {
         &self.columns[i]
     }
 }
@@ -341,8 +341,8 @@ impl Array for StructArray {
 
 /// Create a StructArray from a list of arrays representing the fields of the 
struct. The fields
 /// must be in the same order as the schema defining the struct.
-impl From<Vec<Rc<Array>>> for StructArray {
-    fn from(data: Vec<Rc<Array>>) -> Self {
+impl From<Vec<Arc<Array>>> for StructArray {
+    fn from(data: Vec<Arc<Array>>) -> Self {
         StructArray {
             len: data[0].len(),
             columns: data,
@@ -355,6 +355,7 @@ impl From<Vec<Rc<Array>>> for StructArray {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use std::thread;
 
     #[test]
     fn array_data_from_list_u8() {
@@ -424,8 +425,8 @@ mod tests {
 
     #[test]
     fn test_struct() {
-        let a: Rc<Array> = Rc::new(PrimitiveArray::from(Buffer::from(vec![1, 
2, 3, 4, 5])));
-        let b: Rc<Array> = Rc::new(PrimitiveArray::from(Buffer::from(vec![
+        let a: Arc<Array> = Arc::new(PrimitiveArray::from(Buffer::from(vec![1, 
2, 3, 4, 5])));
+        let b: Arc<Array> = Arc::new(PrimitiveArray::from(Buffer::from(vec![
             1.1, 2.2, 3.3, 4.4, 5.5,
         ])));
 
@@ -448,4 +449,16 @@ mod tests {
         assert_eq!(9, a.max().unwrap());
     }
 
+    #[test]
+    fn test_access_array_concurrently() {
+        let a = PrimitiveArray::from(Buffer::from(vec![5, 6, 7, 8, 9]));
+
+        let ret = thread::spawn(move || {
+            a.iter().collect::<Vec<i32>>()
+        }).join();
+
+        assert!(ret.is_ok());
+        assert_eq!(vec![5, 6, 7, 8, 9], ret.ok().unwrap());
+    }
 }
+
diff --git a/rust/src/buffer.rs b/rust/src/buffer.rs
index b6e68d8..0fdc2c5 100644
--- a/rust/src/buffer.rs
+++ b/rust/src/buffer.rs
@@ -164,9 +164,13 @@ impl From<Bytes> for Buffer<u8> {
     }
 }
 
+unsafe impl<T: ArrowPrimitiveType> Sync for Buffer<T> {}
+unsafe impl<T: ArrowPrimitiveType> Send for Buffer<T> {}
+
 #[cfg(test)]
 mod tests {
     use super::*;
+    use std::thread;
 
     #[test]
     fn test_buffer_i32() {
@@ -271,4 +275,18 @@ mod tests {
         let a = Buffer::from(vec![1, 2, 3, 4, 5]);
         a.slice(3, 2); // should panic
     }
+
+    #[test]
+    fn test_access_buffer_concurrently() {
+        let buffer = Buffer::from(vec![1, 2, 3, 4, 5]);
+        assert_eq!(vec![1, 2, 3, 4, 5], buffer.iter().collect::<Vec<i32>>());
+
+        let collected_vec = thread::spawn(move || {
+            // access buffer in another thread.
+            buffer.iter().collect::<Vec<i32>>()
+        }).join();
+
+        assert!(collected_vec.is_ok());
+        assert_eq!(vec![1, 2, 3, 4, 5], collected_vec.ok().unwrap());
+    }
 }
diff --git a/rust/src/datatypes.rs b/rust/src/datatypes.rs
index 83a157a..d4849da 100644
--- a/rust/src/datatypes.rs
+++ b/rust/src/datatypes.rs
@@ -48,7 +48,7 @@ pub struct Field {
 }
 
 /// Primitive type (ints, floats, strings)
-pub trait ArrowPrimitiveType: Copy + PartialOrd + 'static {}
+pub trait ArrowPrimitiveType: Send + Sync + Copy + PartialOrd + 'static {}
 
 impl ArrowPrimitiveType for bool {}
 impl ArrowPrimitiveType for u8 {}
diff --git a/rust/src/record_batch.rs b/rust/src/record_batch.rs
index 69509e4..4cb4573 100644
--- a/rust/src/record_batch.rs
+++ b/rust/src/record_batch.rs
@@ -17,16 +17,16 @@
 
 use super::array::*;
 use super::datatypes::*;
-use std::rc::Rc;
+use std::sync::Arc;
 
 /// A batch of column-oriented data
 pub struct RecordBatch {
-    schema: Rc<Schema>,
-    columns: Vec<Rc<Array>>,
+    schema: Arc<Schema>,
+    columns: Vec<Arc<Array>>,
 }
 
 impl RecordBatch {
-    pub fn new(schema: Rc<Schema>, columns: Vec<Rc<Array>>) -> Self {
+    pub fn new(schema: Arc<Schema>, columns: Vec<Arc<Array>>) -> Self {
         // assert that there are some columns
         assert!(columns.len() > 0);
         // assert that all columns have the same row count
@@ -37,7 +37,7 @@ impl RecordBatch {
         RecordBatch { schema, columns }
     }
 
-    pub fn schema(&self) -> &Rc<Schema> {
+    pub fn schema(&self) -> &Arc<Schema> {
         &self.schema
     }
 
@@ -49,11 +49,14 @@ impl RecordBatch {
         self.columns[0].len()
     }
 
-    pub fn column(&self, i: usize) -> &Rc<Array> {
+    pub fn column(&self, i: usize) -> &Arc<Array> {
         &self.columns[i]
     }
 }
 
+unsafe impl Send for RecordBatch {}
+unsafe impl Sync for RecordBatch {}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -68,7 +71,7 @@ mod tests {
         let a = PrimitiveArray::from(vec![1, 2, 3, 4, 5]);
         let b = ListArray::from(vec!["a", "b", "c", "d", "e"]);
 
-        let record_batch = RecordBatch::new(Rc::new(schema), vec![Rc::new(a), 
Rc::new(b)]);
+        let record_batch = RecordBatch::new(Arc::new(schema), 
vec![Arc::new(a), Arc::new(b)]);
 
         assert_eq!(5, record_batch.num_rows());
         assert_eq!(2, record_batch.num_columns());

Reply via email to