This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 4927c1ef1c Implement PyArrowType for `Box<dyn RecordBatchReader + 
Send>` (#4751)
4927c1ef1c is described below

commit 4927c1ef1c22373d30c82203577db0bac2ee8eb9
Author: Will Jones <[email protected]>
AuthorDate: Fri Sep 1 04:11:01 2023 -0700

    Implement PyArrowType for `Box<dyn RecordBatchReader + Send>` (#4751)
    
    * implement for boxed rbr
    
    * add docs
---
 arrow-pyarrow-integration-testing/src/lib.rs       | 16 ++++++
 .../tests/test_sql.py                              |  7 +++
 arrow/src/lib.rs                                   |  3 +-
 arrow/src/pyarrow.rs                               | 59 ++++++++++++++++++++--
 4 files changed, 79 insertions(+), 6 deletions(-)

diff --git a/arrow-pyarrow-integration-testing/src/lib.rs 
b/arrow-pyarrow-integration-testing/src/lib.rs
index adcec769f2..a53447b53c 100644
--- a/arrow-pyarrow-integration-testing/src/lib.rs
+++ b/arrow-pyarrow-integration-testing/src/lib.rs
@@ -21,6 +21,7 @@
 use std::sync::Arc;
 
 use arrow::array::new_empty_array;
+use arrow::record_batch::{RecordBatchIterator, RecordBatchReader};
 use pyo3::exceptions::PyValueError;
 use pyo3::prelude::*;
 use pyo3::wrap_pyfunction;
@@ -152,6 +153,20 @@ fn reader_return_errors(obj: 
PyArrowType<ArrowArrayStreamReader>) -> PyResult<()
     }
 }
 
+#[pyfunction]
+fn boxed_reader_roundtrip(
+    obj: PyArrowType<ArrowArrayStreamReader>,
+) -> PyArrowType<Box<dyn RecordBatchReader + Send>> {
+    let schema = obj.0.schema();
+    let batches = obj
+        .0
+        .collect::<Result<Vec<RecordBatch>, ArrowError>>()
+        .unwrap();
+    let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
+    let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
+    PyArrowType(reader)
+}
+
 #[pymodule]
 fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> 
PyResult<()> {
     m.add_wrapped(wrap_pyfunction!(double))?;
@@ -166,5 +181,6 @@ fn arrow_pyarrow_integration_testing(_py: Python, m: 
&PyModule) -> PyResult<()>
     m.add_wrapped(wrap_pyfunction!(round_trip_record_batch))?;
     m.add_wrapped(wrap_pyfunction!(round_trip_record_batch_reader))?;
     m.add_wrapped(wrap_pyfunction!(reader_return_errors))?;
+    m.add_wrapped(wrap_pyfunction!(boxed_reader_roundtrip))?;
     Ok(())
 }
diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py 
b/arrow-pyarrow-integration-testing/tests/test_sql.py
index e2e8d66c0f..3be5b9ec52 100644
--- a/arrow-pyarrow-integration-testing/tests/test_sql.py
+++ b/arrow-pyarrow-integration-testing/tests/test_sql.py
@@ -409,6 +409,13 @@ def test_record_batch_reader():
     got_batches = list(b)
     assert got_batches == batches
 
+    # Also try the boxed reader variant
+    a = pa.RecordBatchReader.from_batches(schema, batches)
+    b = rust.boxed_reader_roundtrip(a)
+    assert b.schema == schema
+    got_batches = list(b)
+    assert got_batches == batches
+
 def test_record_batch_reader_error():
     schema = pa.schema([('ints', pa.list_(pa.int32()))])
 
diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs
index fb904c1908..f4d0585fa6 100644
--- a/arrow/src/lib.rs
+++ b/arrow/src/lib.rs
@@ -375,7 +375,8 @@ pub mod pyarrow;
 
 pub mod record_batch {
     pub use arrow_array::{
-        RecordBatch, RecordBatchOptions, RecordBatchReader, RecordBatchWriter,
+        RecordBatch, RecordBatchIterator, RecordBatchOptions, 
RecordBatchReader,
+        RecordBatchWriter,
     };
 }
 pub use arrow_array::temporal_conversions;
diff --git a/arrow/src/pyarrow.rs b/arrow/src/pyarrow.rs
index 0e9669c5e9..6063ae7632 100644
--- a/arrow/src/pyarrow.rs
+++ b/arrow/src/pyarrow.rs
@@ -15,15 +15,51 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Pass Arrow objects from and to Python, using Arrow's
+//! Pass Arrow objects from and to PyArrow, using Arrow's
 //! [C Data 
Interface](https://arrow.apache.org/docs/format/CDataInterface.html)
 //! and [pyo3](https://docs.rs/pyo3/latest/pyo3/).
 //! For underlying implementation, see the [ffi] module.
+//!
+//! One can use these to write Python functions that take and return PyArrow
+//! objects, with automatic conversion to corresponding arrow-rs types.
+//!
+//! ```ignore
+//! #[pyfunction]
+//! fn double_array(array: PyArrowType<ArrayData>) -> 
PyResult<PyArrowType<ArrayData>> {
+//!     let array = array.0; // Extract from PyArrowType wrapper
+//!     let array: Arc<dyn Array> = make_array(array); // Convert ArrayData to 
ArrayRef
+//!     let array: &Int32Array = array.as_any().downcast_ref()
+//!         .ok_or_else(|| PyValueError::new_err("expected int32 array"))?;
+//!     let array: Int32Array = array.iter().map(|x| x.map(|x| x * 
2)).collect();
+//!     Ok(PyArrowType(array.into_data()))
+//! }
+//! ```
+//!
+//! | pyarrow type                | arrow-rs type                              
                        |
+//! 
|-----------------------------|--------------------------------------------------------------------|
+//! | `pyarrow.DataType`          | [DataType]                                 
                        |
+//! | `pyarrow.Field`             | [Field]                                    
                        |
+//! | `pyarrow.Schema`            | [Schema]                                   
                        |
+//! | `pyarrow.Array`             | [ArrayData]                                
                        |
+//! | `pyarrow.RecordBatch`       | [RecordBatch]                              
                        |
+//! | `pyarrow.RecordBatchReader` | [ArrowArrayStreamReader] / `Box<dyn 
RecordBatchReader + Send>` (1) |
+//!
+//! (1) `pyarrow.RecordBatchReader` can be imported as 
[ArrowArrayStreamReader]. Either
+//! [ArrowArrayStreamReader] or `Box<dyn RecordBatchReader + Send>` can be 
exported
+//! as `pyarrow.RecordBatchReader`. (`Box<dyn RecordBatchReader + Send>` is 
typically
+//! easier to create.)
+//!
+//! PyArrow has the notion of chunked arrays and tables, but arrow-rs doesn't
+//! have these same concepts. A chunked table is instead represented with
+//! `Vec<RecordBatch>`. A `pyarrow.Table` can be imported to Rust by calling
+//! 
[pyarrow.Table.to_reader()](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_reader)
+//! and then importing the reader as a [ArrowArrayStreamReader].
 
 use std::convert::{From, TryFrom};
 use std::ptr::{addr_of, addr_of_mut};
 use std::sync::Arc;
 
+use arrow_array::RecordBatchReader;
 use pyo3::exceptions::{PyTypeError, PyValueError};
 use pyo3::ffi::Py_uintptr_t;
 use pyo3::import_exception;
@@ -256,6 +292,7 @@ impl ToPyArrow for RecordBatch {
     }
 }
 
+/// Supports conversion from `pyarrow.RecordBatchReader` to 
[ArrowArrayStreamReader].
 impl FromPyArrow for ArrowArrayStreamReader {
     fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
         validate_class("RecordBatchReader", value)?;
@@ -277,10 +314,13 @@ impl FromPyArrow for ArrowArrayStreamReader {
     }
 }
 
-impl IntoPyArrow for ArrowArrayStreamReader {
+/// Convert a [`RecordBatchReader`] into a `pyarrow.RecordBatchReader`.
+impl IntoPyArrow for Box<dyn RecordBatchReader + Send> {
+    // We can't implement `ToPyArrow` for `T: RecordBatchReader + Send` because
+    // there is already a blanket implementation for `T: ToPyArrow`.
     fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
         let mut stream = FFI_ArrowArrayStream::empty();
-        unsafe { export_reader_into_raw(Box::new(self), &mut stream) };
+        unsafe { export_reader_into_raw(self, &mut stream) };
 
         let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream;
         let module = py.import("pyarrow")?;
@@ -292,8 +332,17 @@ impl IntoPyArrow for ArrowArrayStreamReader {
     }
 }
 
-/// A newtype wrapper around a `T: PyArrowConvert` that implements
-/// [`FromPyObject`] and [`IntoPy`] allowing usage with pyo3 macros
+/// Convert a [`ArrowArrayStreamReader`] into a `pyarrow.RecordBatchReader`.
+impl IntoPyArrow for ArrowArrayStreamReader {
+    fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
+        let boxed: Box<dyn RecordBatchReader + Send> = Box::new(self);
+        boxed.into_pyarrow(py)
+    }
+}
+
+/// A newtype wrapper. When wrapped around a type `T: FromPyArrow`, it
+/// implements `FromPyObject` for the PyArrow objects. When wrapped around a
+/// `T: IntoPyArrow`, it implements `IntoPy<PyObject>` for the wrapped type.
 #[derive(Debug)]
 pub struct PyArrowType<T>(pub T);
 

Reply via email to