This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9b85118 ARROW-2852: [Rust] Make Array sync and send
9b85118 is described below
commit 9b8511817259c60854430dd5465153ccada63d00
Author: liurenjie1024 <[email protected]>
AuthorDate: Tue Jul 24 22:49:01 2018 -0400
ARROW-2852: [Rust] Make Array sync and send
Mark array as Sync and Send would be useful for multithread processing.
It's also safe since they have no interior mutability.
Author: liurenjie1024 <[email protected]>
Closes #2269 from liurenjie1024/sync and squashes the following commits:
585bc6e3 <liurenjie1024> Add test for concurrency access.
59c45b9f <liurenjie1024> Make data sync and send
---
rust/examples/dynamic_types.rs | 16 ++++++++--------
rust/src/array.rs | 29 +++++++++++++++++++++--------
rust/src/buffer.rs | 18 ++++++++++++++++++
rust/src/datatypes.rs | 2 +-
rust/src/record_batch.rs | 17 ++++++++++-------
5 files changed, 58 insertions(+), 24 deletions(-)
diff --git a/rust/examples/dynamic_types.rs b/rust/examples/dynamic_types.rs
index 86a1890..b8093bf 100644
--- a/rust/examples/dynamic_types.rs
+++ b/rust/examples/dynamic_types.rs
@@ -16,7 +16,7 @@
// under the License.
///! This example demonstrates dealing with mixed types dynamically at runtime
-use std::rc::Rc;
+use std::sync::Arc;
extern crate arrow;
@@ -43,13 +43,13 @@ fn main() {
let id = PrimitiveArray::from(vec![1, 2, 3, 4, 5]);
let nested = StructArray::from(vec![
- Rc::new(ListArray::from(vec!["a", "b", "c", "d", "e"])) as Rc<Array>,
- Rc::new(PrimitiveArray::from(vec![1.1, 2.2, 3.3, 4.4, 5.5])),
- Rc::new(PrimitiveArray::from(vec![2.2, 3.3, 4.4, 5.5, 6.6])),
+ Arc::new(ListArray::from(vec!["a", "b", "c", "d", "e"])) as Arc<Array>,
+ Arc::new(PrimitiveArray::from(vec![1.1, 2.2, 3.3, 4.4, 5.5])),
+ Arc::new(PrimitiveArray::from(vec![2.2, 3.3, 4.4, 5.5, 6.6])),
]);
// build a record batch
- let batch = RecordBatch::new(Rc::new(schema), vec![Rc::new(id),
Rc::new(nested)]);
+ let batch = RecordBatch::new(Arc::new(schema), vec![Arc::new(id),
Arc::new(nested)]);
process(&batch);
}
@@ -80,10 +80,10 @@ fn process(batch: &RecordBatch) {
]);
let _ = RecordBatch::new(
- Rc::new(projected_schema),
+ Arc::new(projected_schema),
vec![
- id.clone(), //NOTE: this is cloning the Rc not the array data
- Rc::new(nested_b.add(nested_c)),
+ id.clone(), //NOTE: this is cloning the Arc not the array data
+ Arc::new(nested_b.add(nested_c)),
],
);
}
diff --git a/rust/src/array.rs b/rust/src/array.rs
index 3fde1c0..e418518 100644
--- a/rust/src/array.rs
+++ b/rust/src/array.rs
@@ -19,7 +19,7 @@
use std::any::Any;
use std::convert::From;
use std::ops::Add;
-use std::rc::Rc;
+use std::sync::Arc;
use std::str;
use std::string::String;
@@ -32,7 +32,7 @@ use super::list_builder::*;
/// Trait for dealing with different types of Array at runtime when the type
of the
/// array is not known in advance
-pub trait Array {
+pub trait Array: Send + Sync {
/// Returns the length of the array (number of items in the array)
fn len(&self) -> usize;
/// Returns the number of null values in the array
@@ -310,7 +310,7 @@ where
/// An Array of structs
pub struct StructArray {
len: usize,
- columns: Vec<Rc<Array>>,
+ columns: Vec<Arc<Array>>,
null_count: usize,
validity_bitmap: Option<Bitmap>,
}
@@ -319,7 +319,7 @@ impl StructArray {
pub fn num_columns(&self) -> usize {
self.columns.len()
}
- pub fn column(&self, i: usize) -> &Rc<Array> {
+ pub fn column(&self, i: usize) -> &Arc<Array> {
&self.columns[i]
}
}
@@ -341,8 +341,8 @@ impl Array for StructArray {
/// Create a StructArray from a list of arrays representing the fields of the
struct. The fields
/// must be in the same order as the schema defining the struct.
-impl From<Vec<Rc<Array>>> for StructArray {
- fn from(data: Vec<Rc<Array>>) -> Self {
+impl From<Vec<Arc<Array>>> for StructArray {
+ fn from(data: Vec<Arc<Array>>) -> Self {
StructArray {
len: data[0].len(),
columns: data,
@@ -355,6 +355,7 @@ impl From<Vec<Rc<Array>>> for StructArray {
#[cfg(test)]
mod tests {
use super::*;
+ use std::thread;
#[test]
fn array_data_from_list_u8() {
@@ -424,8 +425,8 @@ mod tests {
#[test]
fn test_struct() {
- let a: Rc<Array> = Rc::new(PrimitiveArray::from(Buffer::from(vec![1,
2, 3, 4, 5])));
- let b: Rc<Array> = Rc::new(PrimitiveArray::from(Buffer::from(vec![
+ let a: Arc<Array> = Arc::new(PrimitiveArray::from(Buffer::from(vec![1,
2, 3, 4, 5])));
+ let b: Arc<Array> = Arc::new(PrimitiveArray::from(Buffer::from(vec![
1.1, 2.2, 3.3, 4.4, 5.5,
])));
@@ -448,4 +449,16 @@ mod tests {
assert_eq!(9, a.max().unwrap());
}
+ #[test]
+ fn test_access_array_concurrently() {
+ let a = PrimitiveArray::from(Buffer::from(vec![5, 6, 7, 8, 9]));
+
+ let ret = thread::spawn(move || {
+ a.iter().collect::<Vec<i32>>()
+ }).join();
+
+ assert!(ret.is_ok());
+ assert_eq!(vec![5, 6, 7, 8, 9], ret.ok().unwrap());
+ }
}
+
diff --git a/rust/src/buffer.rs b/rust/src/buffer.rs
index b6e68d8..0fdc2c5 100644
--- a/rust/src/buffer.rs
+++ b/rust/src/buffer.rs
@@ -164,9 +164,13 @@ impl From<Bytes> for Buffer<u8> {
}
}
+unsafe impl<T: ArrowPrimitiveType> Sync for Buffer<T> {}
+unsafe impl<T: ArrowPrimitiveType> Send for Buffer<T> {}
+
#[cfg(test)]
mod tests {
use super::*;
+ use std::thread;
#[test]
fn test_buffer_i32() {
@@ -271,4 +275,18 @@ mod tests {
let a = Buffer::from(vec![1, 2, 3, 4, 5]);
a.slice(3, 2); // should panic
}
+
+ #[test]
+ fn test_access_buffer_concurrently() {
+ let buffer = Buffer::from(vec![1, 2, 3, 4, 5]);
+ assert_eq!(vec![1, 2, 3, 4, 5], buffer.iter().collect::<Vec<i32>>());
+
+ let collected_vec = thread::spawn(move || {
+ // access buffer in another thread.
+ buffer.iter().collect::<Vec<i32>>()
+ }).join();
+
+ assert!(collected_vec.is_ok());
+ assert_eq!(vec![1, 2, 3, 4, 5], collected_vec.ok().unwrap());
+ }
}
diff --git a/rust/src/datatypes.rs b/rust/src/datatypes.rs
index 83a157a..d4849da 100644
--- a/rust/src/datatypes.rs
+++ b/rust/src/datatypes.rs
@@ -48,7 +48,7 @@ pub struct Field {
}
/// Primitive type (ints, floats, strings)
-pub trait ArrowPrimitiveType: Copy + PartialOrd + 'static {}
+pub trait ArrowPrimitiveType: Send + Sync + Copy + PartialOrd + 'static {}
impl ArrowPrimitiveType for bool {}
impl ArrowPrimitiveType for u8 {}
diff --git a/rust/src/record_batch.rs b/rust/src/record_batch.rs
index 69509e4..4cb4573 100644
--- a/rust/src/record_batch.rs
+++ b/rust/src/record_batch.rs
@@ -17,16 +17,16 @@
use super::array::*;
use super::datatypes::*;
-use std::rc::Rc;
+use std::sync::Arc;
/// A batch of column-oriented data
pub struct RecordBatch {
- schema: Rc<Schema>,
- columns: Vec<Rc<Array>>,
+ schema: Arc<Schema>,
+ columns: Vec<Arc<Array>>,
}
impl RecordBatch {
- pub fn new(schema: Rc<Schema>, columns: Vec<Rc<Array>>) -> Self {
+ pub fn new(schema: Arc<Schema>, columns: Vec<Arc<Array>>) -> Self {
// assert that there are some columns
assert!(columns.len() > 0);
// assert that all columns have the same row count
@@ -37,7 +37,7 @@ impl RecordBatch {
RecordBatch { schema, columns }
}
- pub fn schema(&self) -> &Rc<Schema> {
+ pub fn schema(&self) -> &Arc<Schema> {
&self.schema
}
@@ -49,11 +49,14 @@ impl RecordBatch {
self.columns[0].len()
}
- pub fn column(&self, i: usize) -> &Rc<Array> {
+ pub fn column(&self, i: usize) -> &Arc<Array> {
&self.columns[i]
}
}
+unsafe impl Send for RecordBatch {}
+unsafe impl Sync for RecordBatch {}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -68,7 +71,7 @@ mod tests {
let a = PrimitiveArray::from(vec![1, 2, 3, 4, 5]);
let b = ListArray::from(vec!["a", "b", "c", "d", "e"]);
- let record_batch = RecordBatch::new(Rc::new(schema), vec![Rc::new(a),
Rc::new(b)]);
+ let record_batch = RecordBatch::new(Arc::new(schema),
vec![Arc::new(a), Arc::new(b)]);
assert_eq!(5, record_batch.num_rows());
assert_eq!(2, record_batch.num_columns());