This is an automated email from the ASF dual-hosted git repository. paleolimbot pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push: new 8e8e38d3 chore(python): Restructure buffer packing to support nulls and improve performance (#426) 8e8e38d3 is described below commit 8e8e38d3890a16348c82d1c83a27d8af56085b25 Author: Dewey Dunnington <de...@dunnington.ca> AuthorDate: Mon Apr 15 11:34:13 2024 -0300 chore(python): Restructure buffer packing to support nulls and improve performance (#426) First, this PR fixes the rather uninformative error that occurs on any error while building an Array (closes #423). The error is now: ```python import nanoarrow as na na.Array([1, 2, 3]) #> ValueError #> ... #> An error occurred whilst converting object of type list to nanoarrow.c_array_stream or nanoarrow.c_array: #> schema is required for CArray import from iterable ``` Second, this PR adds support for `None` in iterables. This makes it much more convenient to create arrays with nulls (closes #424). ```python import nanoarrow as na na.Array([1, 2, None, 4], na.int32()) #> nanoarrow.Array<int32>[4] #> 1 #> 2 #> None #> 4 ``` Finally, this PR tweaks the implementation of packing an iterable into a buffer to avoid the very bad performance that existed previously. The optimizations added were: - The `CBufferBuilder` now implements the buffer protocol (so that we can use `pack_into`) - The `__len__` attribute is checked to preallocate where possible Those optimizations resulted in a ~2x improvement over the previous code; however, the types that can use the `array` constructor have the biggest wins (5-6x improvement). An example with the biggest gain: ```python import numpy as np import nanoarrow as na import pyarrow as pa floats = np.random.random(int(1e6)) floats_lst = list(floats) %timeit pa.array(floats, pa.float64()) #> 1.79 µs ± 9.27 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each) %timeit pa.array(floats_lst, pa.float64()) #> 13.8 ms ± 35.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) %timeit pa.array(iter(floats_lst), pa.float64()) #> 17.9 ms ± 37.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) %timeit na.c_array(floats, na.float64()) #> 5.51 µs ± 25.1 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each) %timeit na.c_array(floats_lst, na.float64(nullable=False)) #> 16.5 ms ± 41.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) %timeit na.c_array(iter(floats_lst), na.float64(nullable=False)) #> 29.1 ms ± 254 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) %timeit na.c_array(floats_lst, na.float64()) #> 43.6 ms ± 484 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) %timeit na.c_array(iter(floats_lst), na.float64()) #> 43 ms ± 227 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) ``` Before this PR: ```python %timeit na.c_array(floats, na.float64()) #> 5.66 µs ± 44.4 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each) %timeit na.c_array(floats_lst, na.float64()) #> 104 ms ± 187 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) %timeit na.c_array(iter(floats_lst), na.float64()) #> 107 ms ± 202 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) ``` It should be noted that there is probably one more PR on top of this to support building variable-length string/binary arrays (and possibly move some of the building code out of `c_lib.py` since it is getting a little crowded there). --- python/src/nanoarrow/_lib.pyx | 231 ++++++++++++++++++++++++++++++++++++++++-- python/src/nanoarrow/c_lib.py | 61 +++++++++-- python/tests/test_c_array.py | 73 ++++++++++++- python/tests/test_c_buffer.py | 21 +++- python/tests/test_iterator.py | 9 +- 5 files changed, 363 insertions(+), 32 deletions(-) diff --git a/python/src/nanoarrow/_lib.pyx b/python/src/nanoarrow/_lib.pyx index d9bda9d9..463f07ce 100644 --- a/python/src/nanoarrow/_lib.pyx +++ b/python/src/nanoarrow/_lib.pyx @@ -42,6 +42,7 @@ from cpython cimport ( PyObject_GetBuffer, PyBuffer_Release, PyBuffer_ToContiguous, + PyBuffer_FillInfo, PyBUF_ANY_CONTIGUOUS, PyBUF_FORMAT, PyBUF_WRITABLE @@ -1856,34 +1857,103 @@ cdef class CBuffer: cdef class CBufferBuilder: - """Wrapper around writable owned buffer CPU content""" + """Wrapper around writable CPU buffer content + + This class provides a growable type-aware buffer that can be used + to create a typed buffer from a Python iterable. This method of + creating a buffer is usually slower than constructors like + ``array.array()`` or ``numpy.array()``; however, this class supports + all Arrow types with a single data buffer (e.g., boolean bitmaps, + float16, intervals, fixed-size binary), some of which are not supported + by other constructors. + """ cdef CBuffer _buffer + cdef bint _locked def __cinit__(self): self._buffer = CBuffer.empty() + self._locked = False + + cdef _assert_unlocked(self): + if self._locked: + raise BufferError("CBufferBuilder is locked") + + # Implement the buffer protocol so that this object can be used as + # the argument to Struct.readinto() (or perhaps written to by + # an independent library). + def __getbuffer__(self, Py_buffer* buffer, int flags): + self._assert_unlocked() + PyBuffer_FillInfo( + buffer, + self, + self._buffer._ptr.data, + self._buffer._ptr.capacity_bytes, + 0, + flags + ) + self._locked = True + + def __releasebuffer__(self, Py_buffer* buffer): + self._locked = False + + def set_data_type(self, ArrowType type_id, int element_size_bits=0): + """Set the data type used to interpret elements in :meth:`write_elements`.""" + self._buffer._set_data_type(type_id, element_size_bits) + return self + + @property + def format(self): + """The ``struct`` format code of the underlying buffer""" + return self._buffer._format.decode() @property def size_bytes(self): + """The number of bytes that have been written to this buffer""" return self._buffer.size_bytes @property def capacity_bytes(self): + """The number of bytes allocated in the underlying buffer""" return self._buffer._ptr.capacity_bytes - def set_data_type(self, ArrowType type_id, int element_size_bits=0): - self._buffer._set_data_type(type_id, element_size_bits) - return self - def reserve_bytes(self, int64_t additional_bytes): + """Ensure that the underlying buffer has space for ``additional_bytes`` + more bytes to be written""" + self._assert_unlocked() cdef int code = ArrowBufferReserve(self._buffer._ptr, additional_bytes) Error.raise_error_not_ok("ArrowBufferReserve()", code) return self + def advance(self, int64_t additional_bytes): + """Manually increase :attr:`size_bytes` by ``additional_bytes`` + + This can be used after writing to the buffer using the buffer protocol + to ensure that :attr:`size_bytes` accurately reflects the number of + bytes written to the buffer. + """ + cdef int64_t new_size = self._buffer._ptr.size_bytes + additional_bytes + if new_size < 0 or new_size > self._buffer._ptr.capacity_bytes: + raise IndexError(f"Can't advance {additional_bytes} from {self.size_bytes}") + + self._buffer._ptr.size_bytes = new_size + return self + def write(self, content): + """Write bytes to this buffer + + Writes the bytes of ``content`` without considering the element type of + ``content`` or the element type of this buffer. + + This method returns the number of bytes that were written. + """ + self._assert_unlocked() + cdef Py_buffer buffer cdef int64_t out PyObject_GetBuffer(content, &buffer, PyBUF_ANY_CONTIGUOUS) + # TODO: Check for single dimension? + cdef int code = ArrowBufferReserve(self._buffer._ptr, buffer.len) if code != NANOARROW_OK: PyBuffer_Release(&buffer) @@ -1904,23 +1974,58 @@ cdef class CBufferBuilder: return out def write_elements(self, obj): + """"Write an iterable of elements to this buffer + + Writes the elements of iterable ``obj`` according to the binary + representation specified by :attr:`format`. This is currently + powered by ``struct.pack_into()`` except when building bitmaps + where an internal implementation is used. + + This method returns the number of elements that were written. + """ + self._assert_unlocked() + + # Boolean arrays need their own writer since Python provides + # no helpers for bitpacking if self._buffer._data_type == NANOARROW_TYPE_BOOL: return self._write_bits(obj) cdef int64_t n_values = 0 + cdef int64_t bytes_per_element = calcsize(self._buffer.format) + cdef int code + struct_obj = Struct(self._buffer._format) - pack = struct_obj.pack - write = self.write + pack_into = struct_obj.pack_into + # If an object has a length, we can avoid extra allocations + if hasattr(obj, "__len__"): + code = ArrowBufferReserve(self._buffer._ptr, bytes_per_element * len(obj)) + Error.raise_error_not_ok("ArrowBufferReserve()", code) + + # Types whose Python representation is a tuple need a slightly different + # pack_into() call. if code != NANOARROW_OK is used instead of + # Error.raise_error_not_ok() Cython can avoid the extra function call + # and this is a very tight loop. if self._buffer._data_type in (NANOARROW_TYPE_INTERVAL_DAY_TIME, - NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO): + NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO): for item in obj: + code = ArrowBufferReserve(self._buffer._ptr, bytes_per_element) + if code != NANOARROW_OK: + Error.raise_error("ArrowBufferReserve()", code) + + pack_into(self, self._buffer._ptr.size_bytes, *item) + self._buffer._ptr.size_bytes += bytes_per_element n_values += 1 - write(pack(*item)) + else: for item in obj: + code = ArrowBufferReserve(self._buffer._ptr, bytes_per_element) + if code != NANOARROW_OK: + Error.raise_error("ArrowBufferReserve()", code) + + pack_into(self, self._buffer._ptr.size_bytes, item) + self._buffer._ptr.size_bytes += bytes_per_element n_values += 1 - write(pack(item)) return n_values @@ -1951,6 +2056,15 @@ cdef class CBufferBuilder: return n_values def finish(self): + """Finish building this buffer + + Performs any steps required to finish building this buffer and + returns the result. Any behaviour resulting from calling methods + on this object after it has been finished is not currently + defined (but should not crash). + """ + self._assert_unlocked() + cdef CBuffer out = self._buffer out._populate_view() @@ -1962,6 +2076,103 @@ cdef class CBufferBuilder: return f"{class_label}({self.size_bytes}/{self.capacity_bytes})" +cdef class NoneAwareWrapperIterator: + """Nullable iterator wrapper + + This class wraps an iterable ``obj`` that might contain ``None`` values + such that the iterable provided by this class contains "empty" (but valid) + values. After ``obj`` has been completely consumed, one can call + ``finish()`` to obtain the resulting bitmap. This is useful for passing + iterables that might contain None to tools that cannot handle them + (e.g., struct.pack(), array.array()). + """ + cdef ArrowBitmap _bitmap + cdef object _obj + cdef object _value_if_none + cdef int64_t _valid_count + cdef int64_t _item_count + + def __cinit__(self, obj, type_id, item_size_bytes=0): + ArrowBitmapInit(&self._bitmap) + self._obj = iter(obj) + + self._value_if_none = self._get_value_if_none(type_id, item_size_bytes) + self._valid_count = 0 + self._item_count = 0 + + def __dealloc__(self): + ArrowBitmapReset(&self._bitmap) + + def reserve(self, int64_t additional_elements): + cdef int code = ArrowBitmapReserve(&self._bitmap, additional_elements) + Error.raise_error_not_ok(self, code) + + def _get_value_if_none(self, type_id, item_size_bytes=0): + if type_id == NANOARROW_TYPE_INTERVAL_DAY_TIME: + return (0, 0) + elif type_id == NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO: + return (0, 0, 0) + elif type_id == NANOARROW_TYPE_BOOL: + return False + elif type_id in (NANOARROW_TYPE_BINARY, NANOARROW_TYPE_FIXED_SIZE_BINARY): + return b"\x00" * item_size_bytes + elif type_id in (NANOARROW_TYPE_HALF_FLOAT, NANOARROW_TYPE_FLOAT, NANOARROW_TYPE_DOUBLE): + return 0.0 + else: + return 0 + + cdef _append_to_validity(self, int is_valid): + self._valid_count += is_valid + self._item_count += 1 + + # Avoid allocating a bitmap if all values seen so far are valid + if self._valid_count == self._item_count: + return + + # If the bitmap hasn't been allocated yet, allocate it now and + # fill with 1s for all previous elements. + cdef int code + if self._bitmap.size_bits == 0 and self._item_count > 1: + code = ArrowBitmapAppend(&self._bitmap, 1, self._item_count - 1) + if code != NANOARROW_OK: + Error.raise_error("ArrowBitmapAppend()", code) + + # Append this element to the bitmap + code = ArrowBitmapAppend(&self._bitmap, is_valid, 1) + if code != NANOARROW_OK: + Error.raise_error("ArrowBitmapAppend()", code) + + def __iter__(self): + for item in self._obj: + if item is None: + self._append_to_validity(0) + yield self._value_if_none + else: + self._append_to_validity(1) + yield item + + def finish(self): + """Obtain the total count, null count, and validity bitmap after + consuming this iterable.""" + cdef CBuffer validity + null_count = self._item_count - self._valid_count + + # If we did allocate a bitmap, make sure the last few bits are zeroed + if null_count > 0 and self._bitmap.size_bits % 8 != 0: + ArrowBitmapAppendUnsafe(&self._bitmap, 0, self._bitmap.size_bits % 8) + + if null_count > 0: + validity = CBuffer.empty() + ArrowBufferMove(&self._bitmap.buffer, validity._ptr) + validity._set_data_type(NANOARROW_TYPE_BOOL) + + return ( + self._item_count, + null_count, + validity if null_count > 0 else None + ) + + cdef class CArrayBuilder: cdef CArray c_array cdef ArrowArray* _ptr diff --git a/python/src/nanoarrow/c_lib.py b/python/src/nanoarrow/c_lib.py index 0acc0a9d..3dd85d29 100644 --- a/python/src/nanoarrow/c_lib.py +++ b/python/src/nanoarrow/c_lib.py @@ -38,6 +38,7 @@ from nanoarrow._lib import ( CSchema, CSchemaBuilder, CSchemaView, + NoneAwareWrapperIterator, _obj_is_buffer, _obj_is_capsule, ) @@ -364,8 +365,8 @@ def c_array_stream(obj=None, schema=None) -> CArrayStream: return CArrayStream.from_array_list([array], array.schema, validate=False) except Exception as e: raise TypeError( - f"Can't convert object of type {type(obj).__name__} " - "to nanoarrow.c_array_stream or nanoarrow.c_array" + f"An error occurred whilst converting {type(obj).__name__} " + f"to nanoarrow.c_array_stream or nanoarrow.c_array: \n {e}" ) from e @@ -602,7 +603,7 @@ def _c_array_from_pybuffer(obj) -> CArray: return builder.finish() -def _c_array_from_iterable(obj, schema=None): +def _c_array_from_iterable(obj, schema=None) -> CArray: if schema is None: raise ValueError("schema is required for CArray import from iterable") @@ -618,28 +619,66 @@ def _c_array_from_iterable(obj, schema=None): builder.start_appending() return builder.finish() - # Use buffer create for crude support of array from iterable - buffer, n_values = _c_buffer_from_iterable(obj, schema) + # We need to know a few things about the data type to choose the appropriate + # strategy for building the array. + schema_view = c_schema_view(schema) + + if schema_view.storage_type_id != schema_view.type_id: + raise ValueError( + f"Can't create array from iterable for type {schema_view.type}" + ) + + # Creating a buffer from an iterable does not handle None values, + # but we can do so here with the NoneAwareWrapperIterator() wrapper. + # This approach is quite a bit slower, so only do it for a nullable + # type. + if schema_view.nullable: + obj_wrapper = NoneAwareWrapperIterator( + obj, schema_view.storage_type_id, schema_view.fixed_size + ) + + if obj_len > 0: + obj_wrapper.reserve(obj_len) + + data, _ = _c_buffer_from_iterable(obj_wrapper, schema_view) + n_values, null_count, validity = obj_wrapper.finish() + else: + data, n_values = _c_buffer_from_iterable(obj, schema_view) + null_count = 0 + validity = None return c_array_from_buffers( - schema, n_values, buffers=(None, buffer), null_count=0, move=True + schema, n_values, buffers=(validity, data), null_count=null_count, move=True ) def _c_buffer_from_iterable(obj, schema=None) -> CBuffer: + import array + if schema is None: raise ValueError("CBuffer from iterable requires schema") - builder = CBufferBuilder() - schema_view = c_schema_view(schema) if schema_view.storage_type_id != schema_view.type_id: - raise ValueError(f"Can't create buffer from type {schema}") + raise ValueError( + f"Can't create buffer from iterable for type {schema_view.type}" + ) + + builder = CBufferBuilder() if schema_view.storage_type_id == CArrowType.FIXED_SIZE_BINARY: builder.set_data_type(CArrowType.BINARY, schema_view.fixed_size * 8) else: builder.set_data_type(schema_view.storage_type_id) - n_values_written = builder.write_elements(obj) - return builder.finish(), n_values_written + # If we are using a typecode supported by the array module, it has much + # faster implementations of safely building buffers from iterables + if ( + builder.format in array.typecodes + and schema_view.storage_type_id != CArrowType.BOOL + ): + buf = array.array(builder.format, obj) + return CBuffer.from_pybuffer(buf), len(buf) + + n_values = builder.write_elements(obj) + return builder.finish(), n_values diff --git a/python/tests/test_c_array.py b/python/tests/test_c_array.py index 1536d159..1511196f 100644 --- a/python/tests/test_c_array.py +++ b/python/tests/test_c_array.py @@ -219,15 +219,86 @@ def test_c_array_from_iterable_empty(): assert len(array_view.buffer(2)) == 0 -def test_c_array_from_iterable_non_empty(): +def test_c_array_from_iterable_non_empty_nullable_without_nulls(): c_array = na.c_array([1, 2, 3], na.int32()) assert c_array.length == 3 assert c_array.null_count == 0 view = na.c_array_view(c_array) + assert list(view.buffer(0)) == [] assert list(view.buffer(1)) == [1, 2, 3] +def test_c_array_from_iterable_non_empty_non_nullable(): + c_array = na.c_array([1, 2, 3], na.int32(nullable=False)) + assert c_array.length == 3 + assert c_array.null_count == 0 + + view = na.c_array_view(c_array) + assert list(view.buffer(0)) == [] + assert list(view.buffer(1)) == [1, 2, 3] + + +def test_c_array_from_iterable_int_with_nulls(): + c_array = na.c_array([1, None, 3], na.int32()) + assert c_array.length == 3 + assert c_array.null_count == 1 + + view = na.c_array_view(c_array) + assert list(view.buffer(0).elements()) == [True, False, True] + [False] * 5 + assert list(view.buffer(1)) == [1, 0, 3] + + +def test_c_array_from_iterable_float_with_nulls(): + c_array = na.c_array([1, None, 3], na.float64()) + assert c_array.length == 3 + assert c_array.null_count == 1 + + view = na.c_array_view(c_array) + assert list(view.buffer(0).elements()) == [True, False, True] + [False] * 5 + assert list(view.buffer(1)) == [1.0, 0.0, 3.0] + + +def test_c_array_from_iterable_bool_with_nulls(): + c_array = na.c_array([True, None, False], na.bool()) + assert c_array.length == 3 + assert c_array.null_count == 1 + + view = na.c_array_view(c_array) + assert list(view.buffer(0).elements()) == [True, False, True] + [False] * 5 + assert list(view.buffer(1).elements()) == [True, False, False] + [False] * 5 + + +def test_c_array_from_iterable_fixed_size_binary_with_nulls(): + c_array = na.c_array([b"1234", None, b"5678"], na.fixed_size_binary(4)) + assert c_array.length == 3 + assert c_array.null_count == 1 + + view = na.c_array_view(c_array) + assert list(view.buffer(0).elements()) == [True, False, True] + [False] * 5 + assert list(view.buffer(1)) == [b"1234", b"\x00\x00\x00\x00", b"5678"] + + +def test_c_array_from_iterable_day_time_interval_with_nulls(): + c_array = na.c_array([(1, 2), None, (3, 4)], na.interval_day_time()) + assert c_array.length == 3 + assert c_array.null_count == 1 + + view = na.c_array_view(c_array) + assert list(view.buffer(0).elements()) == [True, False, True] + [False] * 5 + assert list(view.buffer(1)) == [(1, 2), (0, 0), (3, 4)] + + +def test_c_array_from_iterable_month_day_nano_interval_with_nulls(): + c_array = na.c_array([(1, 2, 3), None, (4, 5, 6)], na.interval_month_day_nano()) + assert c_array.length == 3 + assert c_array.null_count == 1 + + view = na.c_array_view(c_array) + assert list(view.buffer(0).elements()) == [True, False, True] + [False] * 5 + assert list(view.buffer(1)) == [(1, 2, 3), (0, 0, 0), (4, 5, 6)] + + def test_c_array_from_iterable_error(): with pytest.raises(ValueError, match="schema is required"): na.c_array([1, 2, 3]) diff --git a/python/tests/test_c_buffer.py b/python/tests/test_c_buffer.py index 51944723..38bb0c62 100644 --- a/python/tests/test_c_buffer.py +++ b/python/tests/test_c_buffer.py @@ -214,7 +214,24 @@ def test_c_buffer_builder(): assert builder.size_bytes == 10 assert builder.capacity_bytes == 123 - assert bytes(builder.finish()) == b"abcdefghij" + with pytest.raises(IndexError): + builder.advance(-11) + + with pytest.raises(IndexError): + builder.advance(114) + + mv = memoryview(builder) + with pytest.raises(BufferError, match="CBufferBuilder is locked"): + memoryview(builder) + + with pytest.raises(BufferError, match="CBufferBuilder is locked"): + assert bytes(builder.finish()) == b"abcdefghij" + + mv[builder.size_bytes] = ord("k") + builder.advance(1) + + del mv + assert bytes(builder.finish()) == b"abcdefghijk" def test_c_buffer_from_iterable(): @@ -231,7 +248,7 @@ def test_c_buffer_from_iterable(): # An Arrow type whose storage type is not the same as its top-level # type will error. - with pytest.raises(ValueError, match="Can't create buffer from type"): + with pytest.raises(ValueError, match="Can't create buffer"): na.c_buffer([1, 2, 3], na.date32()) diff --git a/python/tests/test_iterator.py b/python/tests/test_iterator.py index 8cbd66de..5b9c8103 100644 --- a/python/tests/test_iterator.py +++ b/python/tests/test_iterator.py @@ -50,14 +50,7 @@ def test_iterator_primitive(): def test_iterator_nullable_primitive(): - array = na.c_array_from_buffers( - na.int32(), - 4, - buffers=[ - na.c_buffer([1, 1, 1, 0], na.bool()), - na.c_buffer([1, 2, 3, 0], na.int32()), - ], - ) + array = na.c_array([1, 2, 3, None], na.int32()) assert list(iter_py(array)) == [1, 2, 3, None] sliced = array[1:]