This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 3e5b07aa4 feat(api!): make ArrowArrayStreamReader Send (#4232)
3e5b07aa4 is described below
commit 3e5b07aa4a9cdfa0f71cd7794c6e56532d12679e
Author: Will Jones <[email protected]>
AuthorDate: Sat May 27 02:30:35 2023 -0700
feat(api!): make ArrowArrayStreamReader Send (#4232)
* feat(api make ArrowArrayStreamReader Send
* simplify ptr handling
* rename pyarrow traits to conform to guidelines
* pr feedback
* remove dangling Box::from_raw
---
arrow-pyarrow-integration-testing/src/lib.rs | 10 +--
arrow/src/ffi_stream.rs | 99 ++++++++++------------------
arrow/src/pyarrow.rs | 93 +++++++++++++++++---------
arrow/tests/pyarrow.rs | 2 +-
4 files changed, 103 insertions(+), 101 deletions(-)
diff --git a/arrow-pyarrow-integration-testing/src/lib.rs
b/arrow-pyarrow-integration-testing/src/lib.rs
index af400868f..730409b37 100644
--- a/arrow-pyarrow-integration-testing/src/lib.rs
+++ b/arrow-pyarrow-integration-testing/src/lib.rs
@@ -24,12 +24,12 @@ use arrow::array::new_empty_array;
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;
-use arrow::array::{Array, ArrayData, ArrayRef, Int64Array, make_array};
+use arrow::array::{make_array, Array, ArrayData, ArrayRef, Int64Array};
use arrow::compute::kernels;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::error::ArrowError;
use arrow::ffi_stream::ArrowArrayStreamReader;
-use arrow::pyarrow::{PyArrowConvert, PyArrowException, PyArrowType};
+use arrow::pyarrow::{FromPyArrow, PyArrowException, PyArrowType, ToPyArrow};
use arrow::record_batch::RecordBatch;
fn to_py_err(err: ArrowError) -> PyErr {
@@ -88,7 +88,8 @@ fn substring(
let array = make_array(array.0);
// substring
- let array = kernels::substring::substring(array.as_ref(), start,
None).map_err(to_py_err)?;
+ let array =
+ kernels::substring::substring(array.as_ref(), start,
None).map_err(to_py_err)?;
Ok(array.to_data().into())
}
@@ -99,7 +100,8 @@ fn concatenate(array: PyArrowType<ArrayData>, py: Python) ->
PyResult<PyObject>
let array = make_array(array.0);
// concat
- let array = kernels::concat::concat(&[array.as_ref(),
array.as_ref()]).map_err(to_py_err)?;
+ let array =
+ kernels::concat::concat(&[array.as_ref(),
array.as_ref()]).map_err(to_py_err)?;
array.to_data().to_pyarrow(py)
}
diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs
index 0e358c36a..cfda4c88b 100644
--- a/arrow/src/ffi_stream.rs
+++ b/arrow/src/ffi_stream.rs
@@ -37,25 +37,19 @@
//! let reader = Box::new(FileReader::try_new(file).unwrap());
//!
//! // export it
-//! let stream = Box::new(FFI_ArrowArrayStream::empty());
-//! let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream;
-//! unsafe { export_reader_into_raw(reader, stream_ptr) };
+//! let mut stream = FFI_ArrowArrayStream::empty();
+//! unsafe { export_reader_into_raw(reader, &mut stream) };
//!
//! // consumed and used by something else...
//!
//! // import it
-//! let stream_reader = unsafe {
ArrowArrayStreamReader::from_raw(stream_ptr).unwrap() };
+//! let stream_reader = unsafe { ArrowArrayStreamReader::from_raw(&mut
stream).unwrap() };
//! let imported_schema = stream_reader.schema();
//!
//! let mut produced_batches = vec![];
//! for batch in stream_reader {
//! produced_batches.push(batch.unwrap());
//! }
-//!
-//! // (drop/release)
-//! unsafe {
-//! Box::from_raw(stream_ptr);
-//! }
//! Ok(())
//! }
//! ```
@@ -105,6 +99,8 @@ pub struct FFI_ArrowArrayStream {
pub private_data: *mut c_void,
}
+unsafe impl Send for FFI_ArrowArrayStream {}
+
// callback used to drop [FFI_ArrowArrayStream] when it is exported.
unsafe extern "C" fn release_stream(stream: *mut FFI_ArrowArrayStream) {
if stream.is_null() {
@@ -231,8 +227,7 @@ impl ExportedArrayStream {
let struct_array = StructArray::from(batch);
let array = FFI_ArrowArray::new(&struct_array.to_data());
- unsafe { std::ptr::copy(addr_of!(array), out, 1) };
- std::mem::forget(array);
+ unsafe { std::ptr::write_unaligned(out, array) };
0
} else {
let err = &next_batch.unwrap_err();
@@ -261,24 +256,21 @@ fn get_error_code(err: &ArrowError) -> i32 {
/// Struct used to fetch `RecordBatch` from the C Stream Interface.
/// Its main responsibility is to expose `RecordBatchReader` functionality
/// that requires [FFI_ArrowArrayStream].
-#[derive(Debug, Clone)]
+#[derive(Debug)]
pub struct ArrowArrayStreamReader {
- stream: Arc<FFI_ArrowArrayStream>,
+ stream: FFI_ArrowArrayStream,
schema: SchemaRef,
}
/// Gets schema from a raw pointer of `FFI_ArrowArrayStream`. This is used
when constructing
/// `ArrowArrayStreamReader` to cache schema.
fn get_stream_schema(stream_ptr: *mut FFI_ArrowArrayStream) ->
Result<SchemaRef> {
- let empty_schema = Arc::new(FFI_ArrowSchema::empty());
- let schema_ptr = Arc::into_raw(empty_schema) as *mut FFI_ArrowSchema;
+ let mut schema = FFI_ArrowSchema::empty();
- let ret_code = unsafe { (*stream_ptr).get_schema.unwrap()(stream_ptr,
schema_ptr) };
-
- let ffi_schema = unsafe { Arc::from_raw(schema_ptr) };
+ let ret_code = unsafe { (*stream_ptr).get_schema.unwrap()(stream_ptr, &mut
schema) };
if ret_code == 0 {
- let schema = Schema::try_from(ffi_schema.as_ref()).unwrap();
+ let schema = Schema::try_from(&schema).unwrap();
Ok(Arc::new(schema))
} else {
Err(ArrowError::CDataInterface(format!(
@@ -291,21 +283,16 @@ impl ArrowArrayStreamReader {
/// Creates a new `ArrowArrayStreamReader` from a `FFI_ArrowArrayStream`.
/// This is used to import from the C Stream Interface.
#[allow(dead_code)]
- pub fn try_new(stream: FFI_ArrowArrayStream) -> Result<Self> {
+ pub fn try_new(mut stream: FFI_ArrowArrayStream) -> Result<Self> {
if stream.release.is_none() {
return Err(ArrowError::CDataInterface(
"input stream is already released".to_string(),
));
}
- let stream_ptr = Arc::into_raw(Arc::new(stream)) as *mut
FFI_ArrowArrayStream;
-
- let schema = get_stream_schema(stream_ptr)?;
+ let schema = get_stream_schema(&mut stream)?;
- Ok(Self {
- stream: unsafe { Arc::from_raw(stream_ptr) },
- schema,
- })
+ Ok(Self { stream, schema })
}
/// Creates a new `ArrowArrayStreamReader` from a raw pointer of
`FFI_ArrowArrayStream`.
@@ -324,13 +311,12 @@ impl ArrowArrayStreamReader {
}
/// Get the last error from `ArrowArrayStreamReader`
- fn get_stream_last_error(&self) -> Option<String> {
+ fn get_stream_last_error(&mut self) -> Option<String> {
self.stream.get_last_error?;
- let stream_ptr = Arc::as_ptr(&self.stream) as *mut
FFI_ArrowArrayStream;
-
let error_str = unsafe {
- let c_str = self.stream.get_last_error.unwrap()(stream_ptr) as
*mut c_char;
+ let c_str =
+ self.stream.get_last_error.unwrap()(&mut self.stream) as *mut
c_char;
CString::from_raw(c_str).into_string()
};
@@ -346,18 +332,14 @@ impl Iterator for ArrowArrayStreamReader {
type Item = Result<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
- let stream_ptr = Arc::as_ptr(&self.stream) as *mut
FFI_ArrowArrayStream;
-
- let empty_array = Arc::new(FFI_ArrowArray::empty());
- let array_ptr = Arc::into_raw(empty_array) as *mut FFI_ArrowArray;
+ let mut array = FFI_ArrowArray::empty();
- let ret_code = unsafe { self.stream.get_next.unwrap()(stream_ptr,
array_ptr) };
+ let ret_code =
+ unsafe { self.stream.get_next.unwrap()(&mut self.stream, &mut
array) };
if ret_code == 0 {
- let ffi_array = unsafe { Arc::from_raw(array_ptr) };
-
// The end of stream has been reached
- if ffi_array.is_released() {
+ if array.is_released() {
return None;
}
@@ -365,7 +347,7 @@ impl Iterator for ArrowArrayStreamReader {
let schema = FFI_ArrowSchema::try_from(schema_ref.as_ref()).ok()?;
let data = ArrowArray {
- array: ffi_array,
+ array: Arc::new(array),
schema: Arc::new(schema),
}
.to_data()
@@ -375,8 +357,6 @@ impl Iterator for ArrowArrayStreamReader {
Some(Ok(record_batch))
} else {
- unsafe { Arc::from_raw(array_ptr) };
-
let last_error = self.get_stream_last_error();
let err = ArrowError::CDataInterface(last_error.unwrap());
Some(Err(err))
@@ -451,40 +431,33 @@ mod tests {
let reader = TestRecordBatchReader::new(schema.clone(), iter);
// Export a `RecordBatchReader` through `FFI_ArrowArrayStream`
- let stream = Arc::new(FFI_ArrowArrayStream::empty());
- let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream;
-
- unsafe { export_reader_into_raw(reader, stream_ptr) };
-
- let empty_schema = Arc::new(FFI_ArrowSchema::empty());
- let schema_ptr = Arc::into_raw(empty_schema) as *mut FFI_ArrowSchema;
+ let mut ffi_stream = FFI_ArrowArrayStream::empty();
+ unsafe { export_reader_into_raw(reader, &mut ffi_stream) };
// Get schema from `FFI_ArrowArrayStream`
- let ret_code = unsafe { get_schema(stream_ptr, schema_ptr) };
+ let mut ffi_schema = FFI_ArrowSchema::empty();
+ let ret_code = unsafe { get_schema(&mut ffi_stream, &mut ffi_schema) };
assert_eq!(ret_code, 0);
- let ffi_schema = unsafe { Arc::from_raw(schema_ptr) };
-
- let exported_schema = Schema::try_from(ffi_schema.as_ref()).unwrap();
+ let exported_schema = Schema::try_from(&ffi_schema).unwrap();
assert_eq!(&exported_schema, schema.as_ref());
+ let ffi_schema = Arc::new(ffi_schema);
+
// Get array from `FFI_ArrowArrayStream`
let mut produced_batches = vec![];
loop {
- let empty_array = Arc::new(FFI_ArrowArray::empty());
- let array_ptr = Arc::into_raw(empty_array.clone()) as *mut
FFI_ArrowArray;
-
- let ret_code = unsafe { get_next(stream_ptr, array_ptr) };
+ let mut ffi_array = FFI_ArrowArray::empty();
+ let ret_code = unsafe { get_next(&mut ffi_stream, &mut ffi_array)
};
assert_eq!(ret_code, 0);
// The end of stream has been reached
- let ffi_array = unsafe { Arc::from_raw(array_ptr) };
if ffi_array.is_released() {
break;
}
let array = ArrowArray {
- array: ffi_array,
+ array: Arc::new(ffi_array),
schema: ffi_schema.clone(),
}
.to_data()
@@ -496,7 +469,6 @@ mod tests {
assert_eq!(produced_batches, vec![batch.clone(), batch]);
- unsafe { Arc::from_raw(stream_ptr) };
Ok(())
}
@@ -512,10 +484,8 @@ mod tests {
let reader = TestRecordBatchReader::new(schema.clone(), iter);
// Import through `FFI_ArrowArrayStream` as `ArrowArrayStreamReader`
- let stream = Arc::new(FFI_ArrowArrayStream::new(reader));
- let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream;
- let stream_reader =
- unsafe { ArrowArrayStreamReader::from_raw(stream_ptr).unwrap() };
+ let stream = FFI_ArrowArrayStream::new(reader);
+ let stream_reader = ArrowArrayStreamReader::try_new(stream).unwrap();
let imported_schema = stream_reader.schema();
assert_eq!(imported_schema, schema);
@@ -527,7 +497,6 @@ mod tests {
assert_eq!(produced_batches, vec![batch.clone(), batch]);
- unsafe { Arc::from_raw(stream_ptr) };
Ok(())
}
diff --git a/arrow/src/pyarrow.rs b/arrow/src/pyarrow.rs
index 081cc8063..ba8d606f2 100644
--- a/arrow/src/pyarrow.rs
+++ b/arrow/src/pyarrow.rs
@@ -15,13 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-//! This module demonstrates a minimal usage of Rust's C data interface to pass
-//! arrays from and to Python.
+//! Pass Arrow objects from and to Python, using Arrow's
+//! [C Data
Interface](https://arrow.apache.org/docs/format/CDataInterface.html)
+//! and [pyo3](https://docs.rs/pyo3/latest/pyo3/).
+//! For underlying implementation, see the [ffi] module.
use std::convert::{From, TryFrom};
use std::ptr::{addr_of, addr_of_mut};
use std::sync::Arc;
+use pyo3::exceptions::PyValueError;
use pyo3::ffi::Py_uintptr_t;
use pyo3::import_exception;
use pyo3::prelude::*;
@@ -44,12 +47,27 @@ fn to_py_err(err: ArrowError) -> PyErr {
PyArrowException::new_err(err.to_string())
}
-pub trait PyArrowConvert: Sized {
+pub trait FromPyArrow: Sized {
fn from_pyarrow(value: &PyAny) -> PyResult<Self>;
+}
+
+/// Create a new PyArrow object from a arrow-rs type.
+pub trait ToPyArrow {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject>;
}
-impl PyArrowConvert for DataType {
+/// Convert an arrow-rs type into a PyArrow object.
+pub trait IntoPyArrow {
+ fn into_pyarrow(self, py: Python) -> PyResult<PyObject>;
+}
+
+impl<T: ToPyArrow> IntoPyArrow for T {
+ fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
+ self.to_pyarrow(py)
+ }
+}
+
+impl FromPyArrow for DataType {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
@@ -57,7 +75,9 @@ impl PyArrowConvert for DataType {
let dtype = DataType::try_from(&c_schema).map_err(to_py_err)?;
Ok(dtype)
}
+}
+impl ToPyArrow for DataType {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
@@ -69,7 +89,7 @@ impl PyArrowConvert for DataType {
}
}
-impl PyArrowConvert for Field {
+impl FromPyArrow for Field {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
@@ -77,7 +97,9 @@ impl PyArrowConvert for Field {
let field = Field::try_from(&c_schema).map_err(to_py_err)?;
Ok(field)
}
+}
+impl ToPyArrow for Field {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
@@ -89,7 +111,7 @@ impl PyArrowConvert for Field {
}
}
-impl PyArrowConvert for Schema {
+impl FromPyArrow for Schema {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
@@ -97,7 +119,9 @@ impl PyArrowConvert for Schema {
let schema = Schema::try_from(&c_schema).map_err(to_py_err)?;
Ok(schema)
}
+}
+impl ToPyArrow for Schema {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
@@ -109,7 +133,7 @@ impl PyArrowConvert for Schema {
}
}
-impl PyArrowConvert for ArrayData {
+impl FromPyArrow for ArrayData {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
// prepare a pointer to receive the Array struct
let mut array = FFI_ArrowArray::empty();
@@ -131,7 +155,9 @@ impl PyArrowConvert for ArrayData {
Ok(data)
}
+}
+impl ToPyArrow for ArrayData {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let array = FFI_ArrowArray::new(self);
let schema =
FFI_ArrowSchema::try_from(self.data_type()).map_err(to_py_err)?;
@@ -149,12 +175,14 @@ impl PyArrowConvert for ArrayData {
}
}
-impl<T: PyArrowConvert> PyArrowConvert for Vec<T> {
+impl<T: FromPyArrow> FromPyArrow for Vec<T> {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let list = value.downcast::<PyList>()?;
- list.iter().map(|x| T::from_pyarrow(&x)).collect()
+ list.iter().map(|x| T::from_pyarrow(x)).collect()
}
+}
+impl<T: ToPyArrow> ToPyArrow for Vec<T> {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let values = self
.iter()
@@ -164,7 +192,7 @@ impl<T: PyArrowConvert> PyArrowConvert for Vec<T> {
}
}
-impl PyArrowConvert for RecordBatch {
+impl FromPyArrow for RecordBatch {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
// TODO(kszucs): implement the FFI conversions in arrow-rs for
RecordBatches
let schema = value.getattr("schema")?;
@@ -179,7 +207,9 @@ impl PyArrowConvert for RecordBatch {
let batch = RecordBatch::try_new(schema, arrays).map_err(to_py_err)?;
Ok(batch)
}
+}
+impl ToPyArrow for RecordBatch {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let mut py_arrays = vec![];
@@ -203,38 +233,36 @@ impl PyArrowConvert for RecordBatch {
}
}
-impl PyArrowConvert for ArrowArrayStreamReader {
+impl FromPyArrow for ArrowArrayStreamReader {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
// prepare a pointer to receive the stream struct
- let stream = Box::new(FFI_ArrowArrayStream::empty());
- let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream;
+ let mut stream = FFI_ArrowArrayStream::empty();
+ let stream_ptr = &mut stream as *mut FFI_ArrowArrayStream;
// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe.
// In particular, `_export_to_c` can go out of bounds
- let args = PyTuple::new(value.py(), &[stream_ptr as Py_uintptr_t]);
+ let args = PyTuple::new(value.py(), [stream_ptr as Py_uintptr_t]);
value.call_method1("_export_to_c", args)?;
- let stream_reader =
- unsafe { ArrowArrayStreamReader::from_raw(stream_ptr).unwrap() };
-
- unsafe {
- drop(Box::from_raw(stream_ptr));
- }
+ let stream_reader = ArrowArrayStreamReader::try_new(stream)
+ .map_err(|err| PyValueError::new_err(err.to_string()))?;
Ok(stream_reader)
}
+}
- fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
- let stream = Box::new(FFI_ArrowArrayStream::empty());
- let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream;
-
- unsafe { export_reader_into_raw(Box::new(self.clone()), stream_ptr) };
+impl IntoPyArrow for ArrowArrayStreamReader {
+ fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
+ let mut stream = FFI_ArrowArrayStream::empty();
+ unsafe { export_reader_into_raw(Box::new(self), &mut stream) };
+ let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream;
let module = py.import("pyarrow")?;
let class = module.getattr("RecordBatchReader")?;
- let args = PyTuple::new(py, &[stream_ptr as Py_uintptr_t]);
+ let args = PyTuple::new(py, [stream_ptr as Py_uintptr_t]);
let reader = class.call_method1("_import_from_c", args)?;
+
Ok(PyObject::from(reader))
}
}
@@ -242,21 +270,24 @@ impl PyArrowConvert for ArrowArrayStreamReader {
/// A newtype wrapper around a `T: PyArrowConvert` that implements
/// [`FromPyObject`] and [`IntoPy`] allowing usage with pyo3 macros
#[derive(Debug)]
-pub struct PyArrowType<T: PyArrowConvert>(pub T);
+pub struct PyArrowType<T: FromPyArrow + IntoPyArrow>(pub T);
-impl<'source, T: PyArrowConvert> FromPyObject<'source> for PyArrowType<T> {
+impl<'source, T: FromPyArrow + IntoPyArrow> FromPyObject<'source> for
PyArrowType<T> {
fn extract(value: &'source PyAny) -> PyResult<Self> {
Ok(Self(T::from_pyarrow(value)?))
}
}
-impl<'a, T: PyArrowConvert> IntoPy<PyObject> for PyArrowType<T> {
+impl<T: FromPyArrow + IntoPyArrow> IntoPy<PyObject> for PyArrowType<T> {
fn into_py(self, py: Python) -> PyObject {
- self.0.to_pyarrow(py).unwrap()
+ match self.0.into_pyarrow(py) {
+ Ok(obj) => obj,
+ Err(err) => err.to_object(py),
+ }
}
}
-impl<T: PyArrowConvert> From<T> for PyArrowType<T> {
+impl<T: FromPyArrow + IntoPyArrow> From<T> for PyArrowType<T> {
fn from(s: T) -> Self {
Self(s)
}
diff --git a/arrow/tests/pyarrow.rs b/arrow/tests/pyarrow.rs
index 4b1226c73..4b6991da0 100644
--- a/arrow/tests/pyarrow.rs
+++ b/arrow/tests/pyarrow.rs
@@ -16,7 +16,7 @@
// under the License.
use arrow::array::{ArrayRef, Int32Array, StringArray};
-use arrow::pyarrow::PyArrowConvert;
+use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use arrow::record_batch::RecordBatch;
use pyo3::Python;
use std::sync::Arc;