This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 4927c1ef1c Implement PyArrowType for `Box<dyn RecordBatchReader +
Send>` (#4751)
4927c1ef1c is described below
commit 4927c1ef1c22373d30c82203577db0bac2ee8eb9
Author: Will Jones <[email protected]>
AuthorDate: Fri Sep 1 04:11:01 2023 -0700
Implement PyArrowType for `Box<dyn RecordBatchReader + Send>` (#4751)
* implement for boxed rbr
* add docs
---
arrow-pyarrow-integration-testing/src/lib.rs | 16 ++++++
.../tests/test_sql.py | 7 +++
arrow/src/lib.rs | 3 +-
arrow/src/pyarrow.rs | 59 ++++++++++++++++++++--
4 files changed, 79 insertions(+), 6 deletions(-)
diff --git a/arrow-pyarrow-integration-testing/src/lib.rs
b/arrow-pyarrow-integration-testing/src/lib.rs
index adcec769f2..a53447b53c 100644
--- a/arrow-pyarrow-integration-testing/src/lib.rs
+++ b/arrow-pyarrow-integration-testing/src/lib.rs
@@ -21,6 +21,7 @@
use std::sync::Arc;
use arrow::array::new_empty_array;
+use arrow::record_batch::{RecordBatchIterator, RecordBatchReader};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;
@@ -152,6 +153,20 @@ fn reader_return_errors(obj:
PyArrowType<ArrowArrayStreamReader>) -> PyResult<()
}
}
+#[pyfunction]
+fn boxed_reader_roundtrip(
+ obj: PyArrowType<ArrowArrayStreamReader>,
+) -> PyArrowType<Box<dyn RecordBatchReader + Send>> {
+ let schema = obj.0.schema();
+ let batches = obj
+ .0
+ .collect::<Result<Vec<RecordBatch>, ArrowError>>()
+ .unwrap();
+ let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
+ let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
+ PyArrowType(reader)
+}
+
#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) ->
PyResult<()> {
m.add_wrapped(wrap_pyfunction!(double))?;
@@ -166,5 +181,6 @@ fn arrow_pyarrow_integration_testing(_py: Python, m:
&PyModule) -> PyResult<()>
m.add_wrapped(wrap_pyfunction!(round_trip_record_batch))?;
m.add_wrapped(wrap_pyfunction!(round_trip_record_batch_reader))?;
m.add_wrapped(wrap_pyfunction!(reader_return_errors))?;
+ m.add_wrapped(wrap_pyfunction!(boxed_reader_roundtrip))?;
Ok(())
}
diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py
b/arrow-pyarrow-integration-testing/tests/test_sql.py
index e2e8d66c0f..3be5b9ec52 100644
--- a/arrow-pyarrow-integration-testing/tests/test_sql.py
+++ b/arrow-pyarrow-integration-testing/tests/test_sql.py
@@ -409,6 +409,13 @@ def test_record_batch_reader():
got_batches = list(b)
assert got_batches == batches
+ # Also try the boxed reader variant
+ a = pa.RecordBatchReader.from_batches(schema, batches)
+ b = rust.boxed_reader_roundtrip(a)
+ assert b.schema == schema
+ got_batches = list(b)
+ assert got_batches == batches
+
def test_record_batch_reader_error():
schema = pa.schema([('ints', pa.list_(pa.int32()))])
diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs
index fb904c1908..f4d0585fa6 100644
--- a/arrow/src/lib.rs
+++ b/arrow/src/lib.rs
@@ -375,7 +375,8 @@ pub mod pyarrow;
pub mod record_batch {
pub use arrow_array::{
- RecordBatch, RecordBatchOptions, RecordBatchReader, RecordBatchWriter,
+ RecordBatch, RecordBatchIterator, RecordBatchOptions,
RecordBatchReader,
+ RecordBatchWriter,
};
}
pub use arrow_array::temporal_conversions;
diff --git a/arrow/src/pyarrow.rs b/arrow/src/pyarrow.rs
index 0e9669c5e9..6063ae7632 100644
--- a/arrow/src/pyarrow.rs
+++ b/arrow/src/pyarrow.rs
@@ -15,15 +15,51 @@
// specific language governing permissions and limitations
// under the License.
-//! Pass Arrow objects from and to Python, using Arrow's
+//! Pass Arrow objects from and to PyArrow, using Arrow's
//! [C Data
Interface](https://arrow.apache.org/docs/format/CDataInterface.html)
//! and [pyo3](https://docs.rs/pyo3/latest/pyo3/).
//! For underlying implementation, see the [ffi] module.
+//!
+//! One can use these to write Python functions that take and return PyArrow
+//! objects, with automatic conversion to corresponding arrow-rs types.
+//!
+//! ```ignore
+//! #[pyfunction]
+//! fn double_array(array: PyArrowType<ArrayData>) ->
PyResult<PyArrowType<ArrayData>> {
+//! let array = array.0; // Extract from PyArrowType wrapper
+//! let array: Arc<dyn Array> = make_array(array); // Convert ArrayData to
ArrayRef
+//! let array: &Int32Array = array.as_any().downcast_ref()
+//! .ok_or_else(|| PyValueError::new_err("expected int32 array"))?;
+//! let array: Int32Array = array.iter().map(|x| x.map(|x| x *
2)).collect();
+//! Ok(PyArrowType(array.into_data()))
+//! }
+//! ```
+//!
+//! | pyarrow type | arrow-rs type
|
+//!
|-----------------------------|--------------------------------------------------------------------|
+//! | `pyarrow.DataType` | [DataType]
|
+//! | `pyarrow.Field` | [Field]
|
+//! | `pyarrow.Schema` | [Schema]
|
+//! | `pyarrow.Array` | [ArrayData]
|
+//! | `pyarrow.RecordBatch` | [RecordBatch]
|
+//! | `pyarrow.RecordBatchReader` | [ArrowArrayStreamReader] / `Box<dyn
RecordBatchReader + Send>` (1) |
+//!
+//! (1) `pyarrow.RecordBatchReader` can be imported as
[ArrowArrayStreamReader]. Either
+//! [ArrowArrayStreamReader] or `Box<dyn RecordBatchReader + Send>` can be
exported
+//! as `pyarrow.RecordBatchReader`. (`Box<dyn RecordBatchReader + Send>` is
typically
+//! easier to create.)
+//!
+//! PyArrow has the notion of chunked arrays and tables, but arrow-rs doesn't
+//! have these same concepts. A chunked table is instead represented with
+//! `Vec<RecordBatch>`. A `pyarrow.Table` can be imported to Rust by calling
+//!
[pyarrow.Table.to_reader()](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_reader)
+//! and then importing the reader as a [ArrowArrayStreamReader].
use std::convert::{From, TryFrom};
use std::ptr::{addr_of, addr_of_mut};
use std::sync::Arc;
+use arrow_array::RecordBatchReader;
use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::ffi::Py_uintptr_t;
use pyo3::import_exception;
@@ -256,6 +292,7 @@ impl ToPyArrow for RecordBatch {
}
}
+/// Supports conversion from `pyarrow.RecordBatchReader` to
[ArrowArrayStreamReader].
impl FromPyArrow for ArrowArrayStreamReader {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
validate_class("RecordBatchReader", value)?;
@@ -277,10 +314,13 @@ impl FromPyArrow for ArrowArrayStreamReader {
}
}
-impl IntoPyArrow for ArrowArrayStreamReader {
+/// Convert a [`RecordBatchReader`] into a `pyarrow.RecordBatchReader`.
+impl IntoPyArrow for Box<dyn RecordBatchReader + Send> {
+ // We can't implement `ToPyArrow` for `T: RecordBatchReader + Send` because
+ // there is already a blanket implementation for `T: ToPyArrow`.
fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
let mut stream = FFI_ArrowArrayStream::empty();
- unsafe { export_reader_into_raw(Box::new(self), &mut stream) };
+ unsafe { export_reader_into_raw(self, &mut stream) };
let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream;
let module = py.import("pyarrow")?;
@@ -292,8 +332,17 @@ impl IntoPyArrow for ArrowArrayStreamReader {
}
}
-/// A newtype wrapper around a `T: PyArrowConvert` that implements
-/// [`FromPyObject`] and [`IntoPy`] allowing usage with pyo3 macros
+/// Convert a [`ArrowArrayStreamReader`] into a `pyarrow.RecordBatchReader`.
+impl IntoPyArrow for ArrowArrayStreamReader {
+ fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
+ let boxed: Box<dyn RecordBatchReader + Send> = Box::new(self);
+ boxed.into_pyarrow(py)
+ }
+}
+
+/// A newtype wrapper. When wrapped around a type `T: FromPyArrow`, it
+/// implements `FromPyObject` for the PyArrow objects. When wrapped around a
+/// `T: IntoPyArrow`, it implements `IntoPy<PyObject>` for the wrapped type.
#[derive(Debug)]
pub struct PyArrowType<T>(pub T);