This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new e7c47fa7 fix(python): Make shallow CArray copies less shallow to
accomodate moving children (#451)
e7c47fa7 is described below
commit e7c47fa74e776c4cf7b1d09f0448b96bb60a6200
Author: Dewey Dunnington <[email protected]>
AuthorDate: Thu May 2 16:18:28 2024 -0300
fix(python): Make shallow CArray copies less shallow to accomodate moving
children (#451)
This PR updates the logic that creates a "shallow copy" of an
`ArrowArray`. Before, it simply made a shallow copy of the outer array,
which works in most cases. However, the spec allows for "moving" child
arrays, which means that the guarantee of a valid *outer* array does not
guarantee anything about the validity of the `ArrowArray*` child
pointers. It's difficult, but possible, to trigger this in Python
(below); however, I think this sort of code is common for importers
(because it allows the lifecycle of columns in an array to be
independent).
```python
import nanoarrow as na
import pyarrow as pa
# Given some array
array = na.c_array_from_buffers(
na.struct({"col1": na.int32()}),
3,
[None],
children=[na.c_array([1, 2, 3], na.int32())]
)
user_array = na.Array(array)
user_array
#> nanoarrow.Array<int32>[3]
#> {'col1': 1}
#> {'col1': 2}
#> {'col1': 3}
# totally valid shallow copy of this array
schema_capsule, array_capsule = array.__arrow_c_array__()
array2 = na.c_array(array_capsule, schema_capsule)
# A consumer is technically allowed to move a child array
x = pa.Array._import_from_c(array2.child(0)._addr(), pa.int32())
del array
del x
# With the previous shallow copy implementation, this could segfault or fail
list(user_array.iter_py())
# Instead of a segfault I tend to get:
#> NanoarrowException: ArrowBasicArrayStreamValidate() failed (22):
Expected int32 array buffer 1 to have size >= 12 bytes but found buffer with 0
bytes
```
After this PR the original array remains valid:
```python
list(user_array.iter_py())
#> [{'col1': 1}, {'col1': 2}, {'col1': 3}]
```
This does, however, have a non-trivial effect when making a shallow copy
where a lot of children are involved:
```python
import nanoarrow as na
import pyarrow as pa
n_col = int(1e3)
n_items = int(1e5)
array_wide = na.c_array_from_buffers(
na.struct({f"col{i}": na.int32() for i in range(n_col)}),
n_items,
[None],
children=[na.c_array(range(n_items), na.int32())] * n_col
)
%timeit pa.array(array_wide)
#> Before this PR
#> 595 µs ± 5.72 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
#> After this Pr
#> 828 µs ± 2.83 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
```
---
python/src/nanoarrow/_lib.pyx | 156 +++++++++++++++++++++++++++++-------------
python/tests/test_c_array.py | 38 ++++++++++
2 files changed, 145 insertions(+), 49 deletions(-)
diff --git a/python/src/nanoarrow/_lib.pyx b/python/src/nanoarrow/_lib.pyx
index 2f193308..690afa2b 100644
--- a/python/src/nanoarrow/_lib.pyx
+++ b/python/src/nanoarrow/_lib.pyx
@@ -89,7 +89,7 @@ cdef void pycapsule_schema_deleter(object schema_capsule)
noexcept:
ArrowFree(schema)
-cdef object alloc_c_schema(ArrowSchema** c_schema) noexcept:
+cdef object alloc_c_schema(ArrowSchema** c_schema):
c_schema[0] = <ArrowSchema*> ArrowMalloc(sizeof(ArrowSchema))
# Ensure the capsule destructor doesn't call a random release pointer
c_schema[0].release = NULL
@@ -107,7 +107,7 @@ cdef void pycapsule_array_deleter(object array_capsule)
noexcept:
ArrowFree(array)
-cdef object alloc_c_array(ArrowArray** c_array) noexcept:
+cdef object alloc_c_array(ArrowArray** c_array):
c_array[0] = <ArrowArray*> ArrowMalloc(sizeof(ArrowArray))
# Ensure the capsule destructor doesn't call a random release pointer
c_array[0].release = NULL
@@ -125,7 +125,7 @@ cdef void pycapsule_array_stream_deleter(object
stream_capsule) noexcept:
ArrowFree(stream)
-cdef object alloc_c_array_stream(ArrowArrayStream** c_stream) noexcept:
+cdef object alloc_c_array_stream(ArrowArrayStream** c_stream):
c_stream[0] = <ArrowArrayStream*> ArrowMalloc(sizeof(ArrowArrayStream))
# Ensure the capsule destructor doesn't call a random release pointer
c_stream[0].release = NULL
@@ -143,7 +143,7 @@ cdef void pycapsule_device_array_deleter(object
device_array_capsule) noexcept:
ArrowFree(device_array)
-cdef object alloc_c_device_array(ArrowDeviceArray** c_device_array) noexcept:
+cdef object alloc_c_device_array(ArrowDeviceArray** c_device_array):
c_device_array[0] = <ArrowDeviceArray*>
ArrowMalloc(sizeof(ArrowDeviceArray))
# Ensure the capsule destructor doesn't call a random release pointer
c_device_array[0].array.release = NULL
@@ -160,68 +160,119 @@ cdef void pycapsule_array_view_deleter(object
array_capsule) noexcept:
ArrowFree(array_view)
-cdef object alloc_c_array_view(ArrowArrayView** c_array_view) noexcept:
+cdef object alloc_c_array_view(ArrowArrayView** c_array_view):
c_array_view[0] = <ArrowArrayView*> ArrowMalloc(sizeof(ArrowArrayView))
ArrowArrayViewInitFromType(c_array_view[0], NANOARROW_TYPE_UNINITIALIZED)
return PyCapsule_New(c_array_view[0], 'nanoarrow_array_view',
&pycapsule_array_view_deleter)
-cdef void arrow_array_release(ArrowArray* array) noexcept with gil:
- Py_DECREF(<object>array.private_data)
- array.private_data = NULL
- array.release = NULL
+# Provide a way to validate that we release all references we create
+cdef int64_t pyobject_buffer_count = 0
+def get_pyobject_buffer_count():
+ global pyobject_buffer_count
+ return pyobject_buffer_count
-cdef void c_array_shallow_copy(object base, const ArrowArray* c_array,
- ArrowArray* c_array_out) noexcept:
- # shallow copy
- memcpy(c_array_out, c_array, sizeof(ArrowArray))
- c_array_out.release = NULL
- c_array_out.private_data = NULL
- # track original base
- c_array_out.private_data = <void*>base
+cdef void c_deallocate_pyobject_buffer(ArrowBufferAllocator* allocator,
uint8_t* ptr, int64_t size) noexcept with gil:
+ Py_DECREF(<object>allocator.private_data)
+
+ global pyobject_buffer_count
+ pyobject_buffer_count -= 1
+
+
+cdef void c_pyobject_buffer(object base, const void* buf, int64_t size_bytes,
ArrowBuffer* out):
+ out.data = <uint8_t*>buf
+ out.size_bytes = size_bytes
+ out.allocator = ArrowBufferDeallocator(
+ <ArrowBufferDeallocatorCallback>c_deallocate_pyobject_buffer,
+ <void*>base
+ )
Py_INCREF(base)
- c_array_out.release = arrow_array_release
+
+ global pyobject_buffer_count
+ pyobject_buffer_count += 1
-cdef object alloc_c_array_shallow_copy(object base, const ArrowArray* c_array):
- """Make a shallow copy of an ArrowArray
+cdef void c_array_shallow_copy(object base, const ArrowArray* src, ArrowArray*
dst):
+ """Make the shallowest (safe) copy possible
- To more safely implement export of an ArrowArray whose address may be
- depended on by some other Python object, we implement a shallow copy
- whose constructor calls Py_INCREF() on a Python object responsible
- for the ArrowArray's lifecycle and whose deleter calls Py_DECREF() on
- the same object.
+ Once a CArray exists at the Python level, nanoarrow makes it very difficult
+ to perform an operation that might render the pointed-to ArrowArray
invalid.
+ Performing a deep copy (i.e., copying buffer content) would be unexpected
and
+ prohibitively expensive, and performing a truly shallow copy (i.e., adding
+ an ArrowArray implementation that simply PyINCREF/pyDECREFs the original
array)
+ is not safe because the Arrow C Data interface specification allows
children
+ to be "move"d. Even though nanoarrow's Python bindings do not do this
unless
+ explicitly requested, when passed to some other library they are free to
do so.
+
+ This implementation of a shallow copy creates a recursive copy of the
original
+ array, including any children and dictionary (if present). It uses the
+ C library's ArrowArray implementation, which takes care of releasing
children,
+ and allows us to use the ArrowBufferDeallocator mechanism to add/remove
+ references to the appropriate PyObject.
"""
- cdef ArrowArray* c_array_out
- array_capsule = alloc_c_array(&c_array_out)
- c_array_shallow_copy(base, c_array, c_array_out)
- return array_capsule
+ # Allocate an ArrowArray* that will definitely be cleaned up should an
exception
+ # be raised in the process of shallow copying its contents
+ cdef ArrowArray* tmp
+ shelter = alloc_c_array(&tmp)
+ cdef int code
+ code = ArrowArrayInitFromType(tmp, NANOARROW_TYPE_UNINITIALIZED)
+ Error.raise_error_not_ok("ArrowArrayInitFromType()", code)
-cdef void c_device_array_shallow_copy(object base, const ArrowDeviceArray*
c_array,
- ArrowDeviceArray* c_array_out) noexcept:
- # shallow copy
- memcpy(c_array_out, c_array, sizeof(ArrowDeviceArray))
- c_array_out.array.release = NULL
- c_array_out.array.private_data = NULL
+ # Copy data for this array, adding a reference for each buffer
+ # This allows us to use the nanoarrow C library's ArrowArray
+ # implementation without writing our own release callbacks/private_data.
+ tmp.length = src.length
+ tmp.offset = src.offset
+ tmp.null_count = src.null_count
- # track original base
- c_array_out.array.private_data = <void*>base
- Py_INCREF(base)
- c_array_out.array.release = arrow_array_release
+ for i in range(src.n_buffers):
+ if src.buffers[i] != NULL:
+ # The purpose of this buffer is soley so that we can use the
+ # ArrowBufferDeallocator mechanism to add a reference to base.
+ # The ArrowArray release callback that exists here after
+ # because of ArrowArrayInitFromType() will call ArrowBufferReset()
+ # on any buffer that was injected in this way (and thus release the
+ # reference to base). We don't actually know the size of the buffer
+ # (and our release callback doesn't use it), so it is set to 0.
+ c_pyobject_buffer(base, src.buffers[i], 0, ArrowArrayBuffer(tmp,
i))
+ # The actual pointer value is tracked separately from the ArrowBuffer
+ # (which is only concerned with object lifecycle).
+ tmp.buffers[i] = src.buffers[i]
-cdef object alloc_c_device_array_shallow_copy(object base, const
ArrowDeviceArray* c_array):
- """Make a shallow copy of an ArrowDeviceArray
+ tmp.n_buffers = src.n_buffers
- See :func:`arrow_c_array_shallow_copy()`
- """
- cdef ArrowDeviceArray* c_array_out
- array_capsule = alloc_c_device_array(&c_array_out)
- c_device_array_shallow_copy(base, c_array, c_array_out)
- return array_capsule
+ # Recursive shallow copy children
+ if src.n_children > 0:
+ code = ArrowArrayAllocateChildren(tmp, src.n_children)
+ Error.raise_error_not_ok("ArrowArrayAllocateChildren()", code)
+
+ for i in range(src.n_children):
+ c_array_shallow_copy(base, src.children[i], tmp.children[i])
+
+ # Recursive shallow copy dictionary
+ if src.dictionary != NULL:
+ code = ArrowArrayAllocateDictionary(tmp)
+ Error.raise_error_not_ok("ArrowArrayAllocateDictionary()", code)
+
+ c_array_shallow_copy(base, src.dictionary, tmp.dictionary)
+
+ # Move tmp into dst
+ ArrowArrayMove(tmp, dst)
+
+
+cdef void c_device_array_shallow_copy(object base, const ArrowDeviceArray* src,
+ ArrowDeviceArray* dst) noexcept:
+ # Copy top-level information but leave the array marked as released
+ # TODO: Should the sync event be copied here too?
+ memcpy(dst, src, sizeof(ArrowDeviceArray))
+ dst.array.release = NULL
+
+ # Shallow copy the array
+ c_array_shallow_copy(base, &src.array, &dst.array)
cdef void pycapsule_buffer_deleter(object stream_capsule) noexcept:
@@ -1356,9 +1407,13 @@ cdef class CArray:
# Export a shallow copy pointing to the same data in a way
# that ensures this object stays valid.
+
# TODO optimize this to export a version where children are reference
# counted and can be released separately
- array_capsule = alloc_c_array_shallow_copy(self._base, self._ptr)
+ cdef ArrowArray* c_array_out
+ array_capsule = alloc_c_array(&c_array_out)
+ c_array_shallow_copy(self._base, self._ptr, c_array_out)
+
return self._schema.__arrow_c_schema__(), array_capsule
def _addr(self):
@@ -2954,7 +3009,10 @@ cdef class CDeviceArray:
# TODO: evaluate whether we need to synchronize here or whether we
should
# move device arrays instead of shallow-copying them
- device_array_capsule = alloc_c_device_array_shallow_copy(self._base,
self._ptr)
+ cdef ArrowDeviceArray* c_array_out
+ device_array_capsule = alloc_c_device_array(&c_array_out)
+ c_device_array_shallow_copy(self._base, self._ptr, c_array_out)
+
return self._schema.__arrow_c_schema__(), device_array_capsule
@staticmethod
diff --git a/python/tests/test_c_array.py b/python/tests/test_c_array.py
index a7a4ecdd..04df7693 100644
--- a/python/tests/test_c_array.py
+++ b/python/tests/test_c_array.py
@@ -141,6 +141,44 @@ def test_c_array_slice_errors():
array[1:0]
+def test_c_array_shallow_copy():
+ import gc
+
+ from nanoarrow._lib import get_pyobject_buffer_count
+
+ gc.collect()
+
+ initial_ref_count = get_pyobject_buffer_count()
+
+ # Create an array with children
+ array = na.c_array_from_buffers(
+ na.struct({"col1": na.int32(), "col2": na.int64()}),
+ 3,
+ [None],
+ children=[na.c_array([1, 2, 3], na.int32()), na.c_array([4, 5, 6],
na.int32())],
+ move=True,
+ )
+
+ # The move=True should have prevented a shallow copy of the children
+ # when constructing the array.
+ assert get_pyobject_buffer_count() == initial_ref_count
+
+ # Force a shallow copy via the array protocol and ensure we saved
+ # references to two additional buffers.
+ _, col1_capsule = array.child(0).__arrow_c_array__()
+ assert get_pyobject_buffer_count() == (initial_ref_count + 1)
+
+ _, col2_capsule = array.child(1).__arrow_c_array__()
+ assert get_pyobject_buffer_count() == (initial_ref_count + 2)
+
+ # Ensure that the references can be removed
+ del col1_capsule
+ assert get_pyobject_buffer_count() == (initial_ref_count + 1)
+
+ del col2_capsule
+ assert get_pyobject_buffer_count() == initial_ref_count
+
+
def test_c_array_builder_init():
builder = CArrayBuilder.allocate()