This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new e7c47fa7 fix(python): Make shallow CArray copies less shallow to 
accomodate moving children (#451)
e7c47fa7 is described below

commit e7c47fa74e776c4cf7b1d09f0448b96bb60a6200
Author: Dewey Dunnington <[email protected]>
AuthorDate: Thu May 2 16:18:28 2024 -0300

    fix(python): Make shallow CArray copies less shallow to accomodate moving 
children (#451)
    
    This PR updates the logic that creates a "shallow copy" of an
    `ArrowArray`. Before, it simply made a shallow copy of the outer array,
    which works in most cases. However, the spec allows for "moving" child
    arrays, which means that the guarantee of a valid *outer* array does not
    guarantee anything about the validity of the `ArrowArray*` child
    pointers. It's difficult, but possible, to trigger this in Python
    (below); however, I think this sort of code is common for importers
    (because it allows the lifecycle of columns in an array to be
    independent).
    
    ```python
    import nanoarrow as na
    import pyarrow as pa
    
    # Given some array
    array = na.c_array_from_buffers(
        na.struct({"col1": na.int32()}),
        3,
        [None],
        children=[na.c_array([1, 2, 3], na.int32())]
    )
    
    user_array = na.Array(array)
    user_array
    #> nanoarrow.Array<int32>[3]
    #> {'col1': 1}
    #> {'col1': 2}
    #> {'col1': 3}
    
    # totally valid shallow copy of this array
    schema_capsule, array_capsule = array.__arrow_c_array__()
    array2 = na.c_array(array_capsule, schema_capsule)
    
    # A consumer is technically allowed to move a child array
    x = pa.Array._import_from_c(array2.child(0)._addr(), pa.int32())
    del array
    del x
    
    # With the previous shallow copy implementation, this could segfault or fail
    list(user_array.iter_py())
    # Instead of a segfault I tend to get:
    #> NanoarrowException: ArrowBasicArrayStreamValidate() failed (22): 
Expected int32 array buffer 1 to have size >= 12 bytes but found buffer with 0 
bytes
    ```
    
    After this PR the original array remains valid:
    
    ```python
    list(user_array.iter_py())
    #> [{'col1': 1}, {'col1': 2}, {'col1': 3}]
    ```
    
    This does, however, have a non-trivial effect when making a shallow copy
    where a lot of children are involved:
    
    ```python
    import nanoarrow as na
    import pyarrow as pa
    
    n_col = int(1e3)
    n_items = int(1e5)
    
    array_wide = na.c_array_from_buffers(
        na.struct({f"col{i}": na.int32() for i in range(n_col)}),
        n_items,
        [None],
        children=[na.c_array(range(n_items), na.int32())] * n_col
    )
    
    %timeit pa.array(array_wide)
    #> Before this PR
    #> 595 µs ± 5.72 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
    #> After this Pr
    #> 828 µs ± 2.83 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
    ```
---
 python/src/nanoarrow/_lib.pyx | 156 +++++++++++++++++++++++++++++-------------
 python/tests/test_c_array.py  |  38 ++++++++++
 2 files changed, 145 insertions(+), 49 deletions(-)

diff --git a/python/src/nanoarrow/_lib.pyx b/python/src/nanoarrow/_lib.pyx
index 2f193308..690afa2b 100644
--- a/python/src/nanoarrow/_lib.pyx
+++ b/python/src/nanoarrow/_lib.pyx
@@ -89,7 +89,7 @@ cdef void pycapsule_schema_deleter(object schema_capsule) 
noexcept:
     ArrowFree(schema)
 
 
-cdef object alloc_c_schema(ArrowSchema** c_schema) noexcept:
+cdef object alloc_c_schema(ArrowSchema** c_schema):
     c_schema[0] = <ArrowSchema*> ArrowMalloc(sizeof(ArrowSchema))
     # Ensure the capsule destructor doesn't call a random release pointer
     c_schema[0].release = NULL
@@ -107,7 +107,7 @@ cdef void pycapsule_array_deleter(object array_capsule) 
noexcept:
     ArrowFree(array)
 
 
-cdef object alloc_c_array(ArrowArray** c_array) noexcept:
+cdef object alloc_c_array(ArrowArray** c_array):
     c_array[0] = <ArrowArray*> ArrowMalloc(sizeof(ArrowArray))
     # Ensure the capsule destructor doesn't call a random release pointer
     c_array[0].release = NULL
@@ -125,7 +125,7 @@ cdef void pycapsule_array_stream_deleter(object 
stream_capsule) noexcept:
     ArrowFree(stream)
 
 
-cdef object alloc_c_array_stream(ArrowArrayStream** c_stream) noexcept:
+cdef object alloc_c_array_stream(ArrowArrayStream** c_stream):
     c_stream[0] = <ArrowArrayStream*> ArrowMalloc(sizeof(ArrowArrayStream))
     # Ensure the capsule destructor doesn't call a random release pointer
     c_stream[0].release = NULL
@@ -143,7 +143,7 @@ cdef void pycapsule_device_array_deleter(object 
device_array_capsule) noexcept:
     ArrowFree(device_array)
 
 
-cdef object alloc_c_device_array(ArrowDeviceArray** c_device_array) noexcept:
+cdef object alloc_c_device_array(ArrowDeviceArray** c_device_array):
     c_device_array[0] = <ArrowDeviceArray*> 
ArrowMalloc(sizeof(ArrowDeviceArray))
     # Ensure the capsule destructor doesn't call a random release pointer
     c_device_array[0].array.release = NULL
@@ -160,68 +160,119 @@ cdef void pycapsule_array_view_deleter(object 
array_capsule) noexcept:
     ArrowFree(array_view)
 
 
-cdef object alloc_c_array_view(ArrowArrayView** c_array_view) noexcept:
+cdef object alloc_c_array_view(ArrowArrayView** c_array_view):
     c_array_view[0] = <ArrowArrayView*> ArrowMalloc(sizeof(ArrowArrayView))
     ArrowArrayViewInitFromType(c_array_view[0], NANOARROW_TYPE_UNINITIALIZED)
     return PyCapsule_New(c_array_view[0], 'nanoarrow_array_view', 
&pycapsule_array_view_deleter)
 
 
-cdef void arrow_array_release(ArrowArray* array) noexcept with gil:
-    Py_DECREF(<object>array.private_data)
-    array.private_data = NULL
-    array.release = NULL
+# Provide a way to validate that we release all references we create
+cdef int64_t pyobject_buffer_count = 0
 
+def get_pyobject_buffer_count():
+    global pyobject_buffer_count
+    return pyobject_buffer_count
 
-cdef void c_array_shallow_copy(object base, const ArrowArray* c_array,
-                                 ArrowArray* c_array_out) noexcept:
-    # shallow copy
-    memcpy(c_array_out, c_array, sizeof(ArrowArray))
-    c_array_out.release = NULL
-    c_array_out.private_data = NULL
 
-    # track original base
-    c_array_out.private_data = <void*>base
+cdef void c_deallocate_pyobject_buffer(ArrowBufferAllocator* allocator, 
uint8_t* ptr, int64_t size) noexcept with gil:
+    Py_DECREF(<object>allocator.private_data)
+
+    global pyobject_buffer_count
+    pyobject_buffer_count -= 1
+
+
+cdef void c_pyobject_buffer(object base, const void* buf, int64_t size_bytes, 
ArrowBuffer* out):
+    out.data = <uint8_t*>buf
+    out.size_bytes = size_bytes
+    out.allocator = ArrowBufferDeallocator(
+        <ArrowBufferDeallocatorCallback>c_deallocate_pyobject_buffer,
+        <void*>base
+    )
     Py_INCREF(base)
-    c_array_out.release = arrow_array_release
+
+    global pyobject_buffer_count
+    pyobject_buffer_count += 1
 
 
-cdef object alloc_c_array_shallow_copy(object base, const ArrowArray* c_array):
-    """Make a shallow copy of an ArrowArray
+cdef void c_array_shallow_copy(object base, const ArrowArray* src, ArrowArray* 
dst):
+    """Make the shallowest (safe) copy possible
 
-    To more safely implement export of an ArrowArray whose address may be
-    depended on by some other Python object, we implement a shallow copy
-    whose constructor calls Py_INCREF() on a Python object responsible
-    for the ArrowArray's lifecycle and whose deleter calls Py_DECREF() on
-    the same object.
+    Once a CArray exists at the Python level, nanoarrow makes it very difficult
+    to perform an operation that might render the pointed-to ArrowArray 
invalid.
+    Performing a deep copy (i.e., copying buffer content) would be unexpected 
and
+    prohibitively expensive, and performing a truly shallow copy (i.e., adding
+    an ArrowArray implementation that simply PyINCREF/pyDECREFs the original 
array)
+    is not safe because the Arrow C Data interface specification allows 
children
+    to be "move"d. Even though nanoarrow's Python bindings do not do this 
unless
+    explicitly requested, when passed to some other library they are free to 
do so.
+
+    This implementation of a shallow copy creates a recursive copy of the 
original
+    array, including any children and dictionary (if present). It uses the
+    C library's ArrowArray implementation, which takes care of releasing 
children,
+    and allows us to use the ArrowBufferDeallocator mechanism to add/remove
+    references to the appropriate PyObject.
     """
-    cdef ArrowArray* c_array_out
-    array_capsule = alloc_c_array(&c_array_out)
-    c_array_shallow_copy(base, c_array, c_array_out)
-    return array_capsule
+    # Allocate an ArrowArray* that will definitely be cleaned up should an 
exception
+    # be raised in the process of shallow copying its contents
+    cdef ArrowArray* tmp
+    shelter = alloc_c_array(&tmp)
+    cdef int code
 
+    code = ArrowArrayInitFromType(tmp, NANOARROW_TYPE_UNINITIALIZED)
+    Error.raise_error_not_ok("ArrowArrayInitFromType()", code)
 
-cdef void c_device_array_shallow_copy(object base, const ArrowDeviceArray* 
c_array,
-                                      ArrowDeviceArray* c_array_out) noexcept:
-    # shallow copy
-    memcpy(c_array_out, c_array, sizeof(ArrowDeviceArray))
-    c_array_out.array.release = NULL
-    c_array_out.array.private_data = NULL
+    # Copy data for this array, adding a reference for each buffer
+    # This allows us to use the nanoarrow C library's ArrowArray
+    # implementation without writing our own release callbacks/private_data.
+    tmp.length = src.length
+    tmp.offset = src.offset
+    tmp.null_count = src.null_count
 
-    # track original base
-    c_array_out.array.private_data = <void*>base
-    Py_INCREF(base)
-    c_array_out.array.release = arrow_array_release
+    for i in range(src.n_buffers):
+        if src.buffers[i] != NULL:
+            # The purpose of this buffer is soley so that we can use the
+            # ArrowBufferDeallocator mechanism to add a reference to base.
+            # The ArrowArray release callback that exists here after
+            # because of ArrowArrayInitFromType() will call ArrowBufferReset()
+            # on any buffer that was injected in this way (and thus release the
+            # reference to base). We don't actually know the size of the buffer
+            # (and our release callback doesn't use it), so it is set to 0.
+            c_pyobject_buffer(base, src.buffers[i], 0, ArrowArrayBuffer(tmp, 
i))
 
+        # The actual pointer value is tracked separately from the ArrowBuffer
+        # (which is only concerned with object lifecycle).
+        tmp.buffers[i] = src.buffers[i]
 
-cdef object alloc_c_device_array_shallow_copy(object base, const 
ArrowDeviceArray* c_array):
-    """Make a shallow copy of an ArrowDeviceArray
+    tmp.n_buffers = src.n_buffers
 
-    See :func:`arrow_c_array_shallow_copy()`
-    """
-    cdef ArrowDeviceArray* c_array_out
-    array_capsule = alloc_c_device_array(&c_array_out)
-    c_device_array_shallow_copy(base, c_array, c_array_out)
-    return array_capsule
+    # Recursive shallow copy children
+    if src.n_children > 0:
+        code = ArrowArrayAllocateChildren(tmp, src.n_children)
+        Error.raise_error_not_ok("ArrowArrayAllocateChildren()", code)
+
+        for i in range(src.n_children):
+            c_array_shallow_copy(base, src.children[i], tmp.children[i])
+
+    # Recursive shallow copy dictionary
+    if src.dictionary != NULL:
+        code = ArrowArrayAllocateDictionary(tmp)
+        Error.raise_error_not_ok("ArrowArrayAllocateDictionary()", code)
+
+        c_array_shallow_copy(base, src.dictionary, tmp.dictionary)
+
+    # Move tmp into dst
+    ArrowArrayMove(tmp, dst)
+
+
+cdef void c_device_array_shallow_copy(object base, const ArrowDeviceArray* src,
+                                      ArrowDeviceArray* dst) noexcept:
+    # Copy top-level information but leave the array marked as released
+    # TODO: Should the sync event be copied here too?
+    memcpy(dst, src, sizeof(ArrowDeviceArray))
+    dst.array.release = NULL
+
+    # Shallow copy the array
+    c_array_shallow_copy(base, &src.array, &dst.array)
 
 
 cdef void pycapsule_buffer_deleter(object stream_capsule) noexcept:
@@ -1356,9 +1407,13 @@ cdef class CArray:
 
         # Export a shallow copy pointing to the same data in a way
         # that ensures this object stays valid.
+
         # TODO optimize this to export a version where children are reference
         # counted and can be released separately
-        array_capsule = alloc_c_array_shallow_copy(self._base, self._ptr)
+        cdef ArrowArray* c_array_out
+        array_capsule = alloc_c_array(&c_array_out)
+        c_array_shallow_copy(self._base, self._ptr, c_array_out)
+
         return self._schema.__arrow_c_schema__(), array_capsule
 
     def _addr(self):
@@ -2954,7 +3009,10 @@ cdef class CDeviceArray:
 
         # TODO: evaluate whether we need to synchronize here or whether we 
should
         # move device arrays instead of shallow-copying them
-        device_array_capsule = alloc_c_device_array_shallow_copy(self._base, 
self._ptr)
+        cdef ArrowDeviceArray* c_array_out
+        device_array_capsule = alloc_c_device_array(&c_array_out)
+        c_device_array_shallow_copy(self._base, self._ptr, c_array_out)
+
         return self._schema.__arrow_c_schema__(), device_array_capsule
 
     @staticmethod
diff --git a/python/tests/test_c_array.py b/python/tests/test_c_array.py
index a7a4ecdd..04df7693 100644
--- a/python/tests/test_c_array.py
+++ b/python/tests/test_c_array.py
@@ -141,6 +141,44 @@ def test_c_array_slice_errors():
         array[1:0]
 
 
+def test_c_array_shallow_copy():
+    import gc
+
+    from nanoarrow._lib import get_pyobject_buffer_count
+
+    gc.collect()
+
+    initial_ref_count = get_pyobject_buffer_count()
+
+    # Create an array with children
+    array = na.c_array_from_buffers(
+        na.struct({"col1": na.int32(), "col2": na.int64()}),
+        3,
+        [None],
+        children=[na.c_array([1, 2, 3], na.int32()), na.c_array([4, 5, 6], 
na.int32())],
+        move=True,
+    )
+
+    # The move=True should have prevented a shallow copy of the children
+    # when constructing the array.
+    assert get_pyobject_buffer_count() == initial_ref_count
+
+    # Force a shallow copy via the array protocol and ensure we saved
+    # references to two additional buffers.
+    _, col1_capsule = array.child(0).__arrow_c_array__()
+    assert get_pyobject_buffer_count() == (initial_ref_count + 1)
+
+    _, col2_capsule = array.child(1).__arrow_c_array__()
+    assert get_pyobject_buffer_count() == (initial_ref_count + 2)
+
+    # Ensure that the references can be removed
+    del col1_capsule
+    assert get_pyobject_buffer_count() == (initial_ref_count + 1)
+
+    del col2_capsule
+    assert get_pyobject_buffer_count() == initial_ref_count
+
+
 def test_c_array_builder_init():
     builder = CArrayBuilder.allocate()
 

Reply via email to