This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 97bb65e feat(python): basic export through PyCapsules (#320)
97bb65e is described below
commit 97bb65edd2c45fc1ad80574d8ef6009466762428
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Fri Nov 24 13:27:05 2023 +0100
feat(python): basic export through PyCapsules (#320)
Follow-up on https://github.com/apache/arrow-nanoarrow/pull/318
Exports the different objects using different mechanism:
- ArrowSchema -> deep copy
- ArrowArray -> shallow copy of the struct, update private_data/release
to ref count the original's base
- ArrowArrayStream -> move the struct (turning the source
nanoarrow.ArrayStream object as released, but that's fine)
---
python/src/nanoarrow/_lib.pyx | 156 +++++++++++++++++++++++++++++++++++++++++-
python/tests/test_capsules.py | 81 +++++++++++++++++++---
2 files changed, 226 insertions(+), 11 deletions(-)
diff --git a/python/src/nanoarrow/_lib.pyx b/python/src/nanoarrow/_lib.pyx
index e1e8bb9..d42f375 100644
--- a/python/src/nanoarrow/_lib.pyx
+++ b/python/src/nanoarrow/_lib.pyx
@@ -28,10 +28,13 @@ be literal and stay close to the structure definitions.
"""
from libc.stdint cimport uintptr_t, int64_t
+from libc.stdlib cimport malloc, free
+from libc.string cimport memcpy
from cpython.mem cimport PyMem_Malloc, PyMem_Free
from cpython.bytes cimport PyBytes_FromStringAndSize
-from cpython.pycapsule cimport PyCapsule_GetPointer
+from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer,
PyCapsule_CheckExact
from cpython cimport Py_buffer
+from cpython.ref cimport PyObject, Py_INCREF, Py_DECREF
from nanoarrow_c cimport *
from nanoarrow_device_c cimport *
@@ -43,6 +46,70 @@ def c_version():
return ArrowNanoarrowVersion().decode("UTF-8")
+#
+# PyCapsule export utilities
+#
+
+
+cdef void pycapsule_schema_deleter(object schema_capsule) noexcept:
+ cdef ArrowSchema* schema = <ArrowSchema*>PyCapsule_GetPointer(
+ schema_capsule, 'arrow_schema'
+ )
+ if schema.release != NULL:
+ schema.release(schema)
+
+ free(schema)
+
+
+cdef object alloc_c_schema(ArrowSchema** c_schema) noexcept:
+ c_schema[0] = <ArrowSchema*> malloc(sizeof(ArrowSchema))
+ # Ensure the capsule destructor doesn't call a random release pointer
+ c_schema[0].release = NULL
+ return PyCapsule_New(c_schema[0], 'arrow_schema',
&pycapsule_schema_deleter)
+
+
+cdef void pycapsule_array_deleter(object array_capsule) noexcept:
+ cdef ArrowArray* array = <ArrowArray*>PyCapsule_GetPointer(
+ array_capsule, 'arrow_array'
+ )
+ # Do not invoke the deleter on a used/moved capsule
+ if array.release != NULL:
+ array.release(array)
+
+ free(array)
+
+
+cdef object alloc_c_array(ArrowArray** c_array) noexcept:
+ c_array[0] = <ArrowArray*> malloc(sizeof(ArrowArray))
+ # Ensure the capsule destructor doesn't call a random release pointer
+ c_array[0].release = NULL
+ return PyCapsule_New(c_array[0], 'arrow_array', &pycapsule_array_deleter)
+
+
+cdef void pycapsule_stream_deleter(object stream_capsule) noexcept:
+ cdef ArrowArrayStream* stream = <ArrowArrayStream*>PyCapsule_GetPointer(
+ stream_capsule, 'arrow_array_stream'
+ )
+ # Do not invoke the deleter on a used/moved capsule
+ if stream.release != NULL:
+ stream.release(stream)
+
+ free(stream)
+
+
+cdef object alloc_c_stream(ArrowArrayStream** c_stream) noexcept:
+ c_stream[0] = <ArrowArrayStream*> malloc(sizeof(ArrowArrayStream))
+ # Ensure the capsule destructor doesn't call a random release pointer
+ c_stream[0].release = NULL
+ return PyCapsule_New(c_stream[0], 'arrow_array_stream',
&pycapsule_stream_deleter)
+
+
+cdef void arrow_array_release(ArrowArray* array) noexcept with gil:
+ Py_DECREF(<object>array.private_data)
+ array.private_data = NULL
+ array.release = NULL
+
+
cdef class SchemaHolder:
"""Memory holder for an ArrowSchema
@@ -220,6 +287,22 @@ cdef class Schema:
<uintptr_t>PyCapsule_GetPointer(schema_capsule, 'arrow_schema')
)
+ def __arrow_c_schema__(self):
+ """
+ Export to a ArrowSchema PyCapsule
+ """
+ self._assert_valid()
+
+ cdef:
+ ArrowSchema* c_schema_out
+ int result
+
+ schema_capsule = alloc_c_schema(&c_schema_out)
+ result = ArrowSchemaDeepCopy(self._ptr, c_schema_out)
+ if result != NANOARROW_OK:
+ Error.raise_error("ArrowSchemaDeepCopy", result)
+ return schema_capsule
+
def _addr(self):
return <uintptr_t>self._ptr
@@ -448,7 +531,7 @@ cdef class Array:
return Array(base, base._addr(), schema)
def __cinit__(self, object base, uintptr_t addr, Schema schema):
- self._base = base,
+ self._base = base
self._ptr = <ArrowArray*>addr
self._schema = schema
@@ -479,6 +562,46 @@ cdef class Array:
return out
+ def __arrow_c_array__(self, requested_schema=None):
+ """
+ Get a pair of PyCapsules containing a C ArrowArray representation of
the object.
+
+ Parameters
+ ----------
+ requested_schema : PyCapsule | None
+ A PyCapsule containing a C ArrowSchema representation of a
requested
+ schema. Not supported.
+
+ Returns
+ -------
+ Tuple[PyCapsule, PyCapsule]
+ A pair of PyCapsules containing a C ArrowSchema and ArrowArray,
+ respectively.
+ """
+ self._assert_valid()
+ if requested_schema is not None:
+ raise NotImplementedError("requested_schema")
+
+ # TODO optimize this to export a version where children are reference
+ # counted and can be released separately
+
+ cdef:
+ ArrowArray* c_array_out
+
+ array_capsule = alloc_c_array(&c_array_out)
+
+ # shallow copy
+ memcpy(c_array_out, self._ptr, sizeof(ArrowArray))
+ c_array_out.release = NULL
+ c_array_out.private_data = NULL
+
+ # track original base
+ c_array_out.private_data = <void*>self._base
+ Py_INCREF(self._base)
+ c_array_out.release = arrow_array_release
+
+ return self._schema.__arrow_c_schema__(), array_capsule
+
def _addr(self):
return <uintptr_t>self._ptr
@@ -928,6 +1051,35 @@ cdef class ArrayStream:
<uintptr_t>PyCapsule_GetPointer(stream_capsule,
'arrow_array_stream')
)
+ def __arrow_c_stream__(self, requested_schema=None):
+ """
+ Export the stream as an Arrow C stream PyCapsule.
+
+ Parameters
+ ----------
+ requested_schema : PyCapsule | None
+ A PyCapsule containing a C ArrowSchema representation of a
requested
+ schema. Not supported.
+
+ Returns
+ -------
+ PyCapsule
+ """
+ self._assert_valid()
+ if requested_schema is not None:
+ raise NotImplementedError("requested_schema")
+
+ cdef:
+ ArrowArrayStream* c_stream_out
+
+ stream_capsule = alloc_c_stream(&c_stream_out)
+
+ # move the stream
+ memcpy(c_stream_out, self._ptr, sizeof(ArrowArrayStream))
+ self._ptr.release = NULL
+
+ return stream_capsule
+
def _addr(self):
return <uintptr_t>self._ptr
diff --git a/python/tests/test_capsules.py b/python/tests/test_capsules.py
index e14dc8c..f124dec 100644
--- a/python/tests/test_capsules.py
+++ b/python/tests/test_capsules.py
@@ -14,8 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
import pyarrow as pa
+import pytest
import nanoarrow as na
@@ -44,7 +44,7 @@ class StreamWrapper:
return
self.stream.__arrow_c_stream__(requested_schema=requested_schema)
-def test_schema_import():
+def test_schema():
pa_schema = pa.schema([pa.field("some_name", pa.int32())])
for schema_obj in [pa_schema, SchemaWrapper(pa_schema)]:
@@ -54,8 +54,15 @@ def test_schema_import():
assert schema.format == "+s"
assert schema._to_string(recursive=True) == "struct<some_name: int32>"
+ # roundtrip
+ pa_schema2 = pa.schema(schema)
+ assert pa_schema2.equals(pa_schema)
+ # schemas stay valid because it exports a deep copy
+ del pa_schema2
+ assert schema.is_valid()
+
-def test_array_import():
+def test_array():
pa_arr = pa.array([1, 2, 3], pa.int32())
for arr_obj in [pa_arr, ArrayWrapper(pa_arr)]:
@@ -65,14 +72,17 @@ def test_array_import():
assert array.length == 3
assert array.schema._to_string(recursive=True) == "int32"
+ # roundtrip
+ pa_arr2 = pa.array(array)
+ assert pa_arr2.equals(pa_arr)
+ del pa_arr2
+ assert array.is_valid()
+
-def test_array_stream_import():
- def make_reader():
- pa_array_child = pa.array([1, 2, 3], pa.int32())
- pa_array = pa.record_batch([pa_array_child], names=["some_column"])
- return pa.RecordBatchReader.from_batches(pa_array.schema, [pa_array])
+def test_array_stream():
+ pa_table = pa.table({"some_column": pa.array([1, 2, 3], pa.int32())})
- for stream_obj in [make_reader(), StreamWrapper(make_reader())]:
+ for stream_obj in [pa_table, StreamWrapper(pa_table)]:
array_stream = na.array_stream(stream_obj)
# some basic validation
assert array_stream.is_valid()
@@ -82,3 +92,56 @@ def test_array_stream_import():
array_stream.get_schema()._to_string(recursive=True)
== "struct<some_column: int32>"
)
+
+ # roundtrip
+ array_stream = na.array_stream(stream_obj)
+ pa_table2 = pa.table(array_stream)
+ assert pa_table2.equals(pa_table)
+ # exporting a stream marks the original object as released (it is
moved)
+ assert not array_stream.is_valid()
+ # and thus exporting a second time doesn't work
+ with pytest.raises(RuntimeError):
+ pa.table(array_stream)
+
+
+def test_export_invalid():
+ schema = na.Schema.allocate()
+ assert schema.is_valid() is False
+
+ with pytest.raises(RuntimeError, match="schema is released"):
+ pa.schema(schema)
+
+ array = na.Array.allocate(na.Schema.allocate())
+ assert array.is_valid() is False
+ with pytest.raises(RuntimeError, match="Array is released"):
+ pa.array(array)
+
+ array_stream = na.ArrayStream.allocate()
+ assert array_stream.is_valid() is False
+ with pytest.raises(RuntimeError, match="array stream is released"):
+ pa.table(array_stream)
+
+
+def test_import_from_c_errors():
+ # ensure proper error is raised in case of wrong object or wrong capsule
+ pa_arr = pa.array([1, 2, 3], pa.int32())
+
+ with pytest.raises(ValueError):
+ na.Schema._import_from_c_capsule("wrong")
+
+ with pytest.raises(ValueError):
+ na.Schema._import_from_c_capsule(pa_arr.__arrow_c_array__())
+
+ with pytest.raises(ValueError):
+ na.Array._import_from_c_capsule("wrong", "wrong")
+
+ with pytest.raises(ValueError):
+ na.Array._import_from_c_capsule(
+ pa_arr.__arrow_c_array__(), pa_arr.type.__arrow_c_schema__()
+ )
+
+ with pytest.raises(ValueError):
+ na.ArrayStream._import_from_c_capsule("wrong")
+
+ with pytest.raises(ValueError):
+ na.ArrayStream._import_from_c_capsule(pa_arr.__arrow_c_array__())