This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 97bb65e  feat(python): basic export through PyCapsules (#320)
97bb65e is described below

commit 97bb65edd2c45fc1ad80574d8ef6009466762428
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Fri Nov 24 13:27:05 2023 +0100

    feat(python): basic export through PyCapsules (#320)
    
    Follow-up on https://github.com/apache/arrow-nanoarrow/pull/318
    
    Exports the different objects using different mechanism:
    
    - ArrowSchema -> deep copy
    - ArrowArray -> shallow copy of the struct, update private_data/release
    to ref count the original's base
    - ArrowArrayStream -> move the struct (turning the source
    nanoarrow.ArrayStream object as released, but that's fine)
---
 python/src/nanoarrow/_lib.pyx | 156 +++++++++++++++++++++++++++++++++++++++++-
 python/tests/test_capsules.py |  81 +++++++++++++++++++---
 2 files changed, 226 insertions(+), 11 deletions(-)

diff --git a/python/src/nanoarrow/_lib.pyx b/python/src/nanoarrow/_lib.pyx
index e1e8bb9..d42f375 100644
--- a/python/src/nanoarrow/_lib.pyx
+++ b/python/src/nanoarrow/_lib.pyx
@@ -28,10 +28,13 @@ be literal and stay close to the structure definitions.
 """
 
 from libc.stdint cimport uintptr_t, int64_t
+from libc.stdlib cimport malloc, free
+from libc.string cimport memcpy
 from cpython.mem cimport PyMem_Malloc, PyMem_Free
 from cpython.bytes cimport PyBytes_FromStringAndSize
-from cpython.pycapsule cimport PyCapsule_GetPointer
+from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer, 
PyCapsule_CheckExact
 from cpython cimport Py_buffer
+from cpython.ref cimport PyObject, Py_INCREF, Py_DECREF
 from nanoarrow_c cimport *
 from nanoarrow_device_c cimport *
 
@@ -43,6 +46,70 @@ def c_version():
     return ArrowNanoarrowVersion().decode("UTF-8")
 
 
+#
+# PyCapsule export utilities
+#
+
+
+cdef void pycapsule_schema_deleter(object schema_capsule) noexcept:
+    cdef ArrowSchema* schema = <ArrowSchema*>PyCapsule_GetPointer(
+        schema_capsule, 'arrow_schema'
+    )
+    if schema.release != NULL:
+        schema.release(schema)
+
+    free(schema)
+
+
+cdef object alloc_c_schema(ArrowSchema** c_schema) noexcept:
+    c_schema[0] = <ArrowSchema*> malloc(sizeof(ArrowSchema))
+    # Ensure the capsule destructor doesn't call a random release pointer
+    c_schema[0].release = NULL
+    return PyCapsule_New(c_schema[0], 'arrow_schema', 
&pycapsule_schema_deleter)
+
+
+cdef void pycapsule_array_deleter(object array_capsule) noexcept:
+    cdef ArrowArray* array = <ArrowArray*>PyCapsule_GetPointer(
+        array_capsule, 'arrow_array'
+    )
+    # Do not invoke the deleter on a used/moved capsule
+    if array.release != NULL:
+        array.release(array)
+
+    free(array)
+
+
+cdef object alloc_c_array(ArrowArray** c_array) noexcept:
+    c_array[0] = <ArrowArray*> malloc(sizeof(ArrowArray))
+    # Ensure the capsule destructor doesn't call a random release pointer
+    c_array[0].release = NULL
+    return PyCapsule_New(c_array[0], 'arrow_array', &pycapsule_array_deleter)
+
+
+cdef void pycapsule_stream_deleter(object stream_capsule) noexcept:
+    cdef ArrowArrayStream* stream = <ArrowArrayStream*>PyCapsule_GetPointer(
+        stream_capsule, 'arrow_array_stream'
+    )
+    # Do not invoke the deleter on a used/moved capsule
+    if stream.release != NULL:
+        stream.release(stream)
+
+    free(stream)
+
+
+cdef object alloc_c_stream(ArrowArrayStream** c_stream) noexcept:
+    c_stream[0] = <ArrowArrayStream*> malloc(sizeof(ArrowArrayStream))
+    # Ensure the capsule destructor doesn't call a random release pointer
+    c_stream[0].release = NULL
+    return PyCapsule_New(c_stream[0], 'arrow_array_stream', 
&pycapsule_stream_deleter)
+
+
+cdef void arrow_array_release(ArrowArray* array) noexcept with gil:
+    Py_DECREF(<object>array.private_data)
+    array.private_data = NULL
+    array.release = NULL
+
+
 cdef class SchemaHolder:
     """Memory holder for an ArrowSchema
 
@@ -220,6 +287,22 @@ cdef class Schema:
             <uintptr_t>PyCapsule_GetPointer(schema_capsule, 'arrow_schema')
         )
 
+    def __arrow_c_schema__(self):
+        """
+        Export to a ArrowSchema PyCapsule
+        """
+        self._assert_valid()
+
+        cdef:
+            ArrowSchema* c_schema_out
+            int result
+
+        schema_capsule = alloc_c_schema(&c_schema_out)
+        result = ArrowSchemaDeepCopy(self._ptr, c_schema_out)
+        if result != NANOARROW_OK:
+            Error.raise_error("ArrowSchemaDeepCopy", result)
+        return schema_capsule
+
     def _addr(self):
         return <uintptr_t>self._ptr
 
@@ -448,7 +531,7 @@ cdef class Array:
         return Array(base, base._addr(), schema)
 
     def __cinit__(self, object base, uintptr_t addr, Schema schema):
-        self._base = base,
+        self._base = base
         self._ptr = <ArrowArray*>addr
         self._schema = schema
 
@@ -479,6 +562,46 @@ cdef class Array:
 
         return out
 
+    def __arrow_c_array__(self, requested_schema=None):
+        """
+        Get a pair of PyCapsules containing a C ArrowArray representation of 
the object.
+
+        Parameters
+        ----------
+        requested_schema : PyCapsule | None
+            A PyCapsule containing a C ArrowSchema representation of a 
requested
+            schema. Not supported.
+
+        Returns
+        -------
+        Tuple[PyCapsule, PyCapsule]
+            A pair of PyCapsules containing a C ArrowSchema and ArrowArray,
+            respectively.
+        """
+        self._assert_valid()
+        if requested_schema is not None:
+            raise NotImplementedError("requested_schema")
+
+        # TODO optimize this to export a version where children are reference
+        # counted and can be released separately
+
+        cdef:
+            ArrowArray* c_array_out
+
+        array_capsule = alloc_c_array(&c_array_out)
+
+        # shallow copy
+        memcpy(c_array_out, self._ptr, sizeof(ArrowArray))
+        c_array_out.release = NULL
+        c_array_out.private_data = NULL
+
+        # track original base
+        c_array_out.private_data = <void*>self._base
+        Py_INCREF(self._base)
+        c_array_out.release = arrow_array_release
+
+        return self._schema.__arrow_c_schema__(), array_capsule
+
     def _addr(self):
         return <uintptr_t>self._ptr
 
@@ -928,6 +1051,35 @@ cdef class ArrayStream:
             <uintptr_t>PyCapsule_GetPointer(stream_capsule, 
'arrow_array_stream')
         )
 
+    def __arrow_c_stream__(self, requested_schema=None):
+        """
+        Export the stream as an Arrow C stream PyCapsule.
+
+        Parameters
+        ----------
+        requested_schema : PyCapsule | None
+            A PyCapsule containing a C ArrowSchema representation of a 
requested
+            schema. Not supported.
+
+        Returns
+        -------
+        PyCapsule
+        """
+        self._assert_valid()
+        if requested_schema is not None:
+            raise NotImplementedError("requested_schema")
+
+        cdef:
+            ArrowArrayStream* c_stream_out
+
+        stream_capsule = alloc_c_stream(&c_stream_out)
+
+        # move the stream
+        memcpy(c_stream_out, self._ptr, sizeof(ArrowArrayStream))
+        self._ptr.release = NULL
+
+        return stream_capsule
+
     def _addr(self):
         return <uintptr_t>self._ptr
 
diff --git a/python/tests/test_capsules.py b/python/tests/test_capsules.py
index e14dc8c..f124dec 100644
--- a/python/tests/test_capsules.py
+++ b/python/tests/test_capsules.py
@@ -14,8 +14,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import pyarrow as pa
+import pytest
 
 import nanoarrow as na
 
@@ -44,7 +44,7 @@ class StreamWrapper:
         return 
self.stream.__arrow_c_stream__(requested_schema=requested_schema)
 
 
-def test_schema_import():
+def test_schema():
     pa_schema = pa.schema([pa.field("some_name", pa.int32())])
 
     for schema_obj in [pa_schema, SchemaWrapper(pa_schema)]:
@@ -54,8 +54,15 @@ def test_schema_import():
         assert schema.format == "+s"
         assert schema._to_string(recursive=True) == "struct<some_name: int32>"
 
+        # roundtrip
+        pa_schema2 = pa.schema(schema)
+        assert pa_schema2.equals(pa_schema)
+        # schemas stay valid because it exports a deep copy
+        del pa_schema2
+        assert schema.is_valid()
+
 
-def test_array_import():
+def test_array():
     pa_arr = pa.array([1, 2, 3], pa.int32())
 
     for arr_obj in [pa_arr, ArrayWrapper(pa_arr)]:
@@ -65,14 +72,17 @@ def test_array_import():
         assert array.length == 3
         assert array.schema._to_string(recursive=True) == "int32"
 
+        # roundtrip
+        pa_arr2 = pa.array(array)
+        assert pa_arr2.equals(pa_arr)
+        del pa_arr2
+        assert array.is_valid()
+
 
-def test_array_stream_import():
-    def make_reader():
-        pa_array_child = pa.array([1, 2, 3], pa.int32())
-        pa_array = pa.record_batch([pa_array_child], names=["some_column"])
-        return pa.RecordBatchReader.from_batches(pa_array.schema, [pa_array])
+def test_array_stream():
+    pa_table = pa.table({"some_column": pa.array([1, 2, 3], pa.int32())})
 
-    for stream_obj in [make_reader(), StreamWrapper(make_reader())]:
+    for stream_obj in [pa_table, StreamWrapper(pa_table)]:
         array_stream = na.array_stream(stream_obj)
         # some basic validation
         assert array_stream.is_valid()
@@ -82,3 +92,56 @@ def test_array_stream_import():
             array_stream.get_schema()._to_string(recursive=True)
             == "struct<some_column: int32>"
         )
+
+        # roundtrip
+        array_stream = na.array_stream(stream_obj)
+        pa_table2 = pa.table(array_stream)
+        assert pa_table2.equals(pa_table)
+        # exporting a stream marks the original object as released (it is 
moved)
+        assert not array_stream.is_valid()
+        # and thus exporting a second time doesn't work
+        with pytest.raises(RuntimeError):
+            pa.table(array_stream)
+
+
+def test_export_invalid():
+    schema = na.Schema.allocate()
+    assert schema.is_valid() is False
+
+    with pytest.raises(RuntimeError, match="schema is released"):
+        pa.schema(schema)
+
+    array = na.Array.allocate(na.Schema.allocate())
+    assert array.is_valid() is False
+    with pytest.raises(RuntimeError, match="Array is released"):
+        pa.array(array)
+
+    array_stream = na.ArrayStream.allocate()
+    assert array_stream.is_valid() is False
+    with pytest.raises(RuntimeError, match="array stream is released"):
+        pa.table(array_stream)
+
+
+def test_import_from_c_errors():
+    # ensure proper error is raised in case of wrong object or wrong capsule
+    pa_arr = pa.array([1, 2, 3], pa.int32())
+
+    with pytest.raises(ValueError):
+        na.Schema._import_from_c_capsule("wrong")
+
+    with pytest.raises(ValueError):
+        na.Schema._import_from_c_capsule(pa_arr.__arrow_c_array__())
+
+    with pytest.raises(ValueError):
+        na.Array._import_from_c_capsule("wrong", "wrong")
+
+    with pytest.raises(ValueError):
+        na.Array._import_from_c_capsule(
+            pa_arr.__arrow_c_array__(), pa_arr.type.__arrow_c_schema__()
+        )
+
+    with pytest.raises(ValueError):
+        na.ArrayStream._import_from_c_capsule("wrong")
+
+    with pytest.raises(ValueError):
+        na.ArrayStream._import_from_c_capsule(pa_arr.__arrow_c_array__())

Reply via email to