This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new d1b9924f feat(python): Add column-wise buffer builder (#464)
d1b9924f is described below
commit d1b9924fe74fb68096cdc209d0341a1e163b570e
Author: Dewey Dunnington <[email protected]>
AuthorDate: Fri May 17 16:10:43 2024 -0300
feat(python): Add column-wise buffer builder (#464)
This PR implements building columns buffer-wise for the types where this
makes sense. It also implements a few other changes:
- `item_size` was renamed to `itemsize` to match the memoryview property
name
- The visitor methods are now the `ArrayViewVisitable` mixin such that
they are available in both the `Array` and `ArrayView` without
duplicating documentation.
Functionally this means that the `Array` and `ArrayStream` now have
`to_column()` and `to_column_list()` methods that do something that more
closely matches what somebody would expect.
A quick demo:
```python
import nanoarrow as na
import pyarrow as pa
batch = pa.record_batch({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
batch_with_nulls = pa.record_batch({"col1": [1, None, 3], "col2": ["a",
"b", None]})
# Either builds a buffer or a list depending on column types
na.Array(batch).to_columns_pysequence()
#> (['col1', 'col2'],
#> [nanoarrow.c_lib.CBuffer(int64[24 b] 1 2 3), ['a', 'b', 'c']])
# One can inject a null handler (a few experimental ones provided)
na.Array(batch_with_nulls).to_columns_pysequence(handle_nulls=na.nulls_as_sentinel())
#> (['col1', 'col2'], [array([ 1., nan, 3.]), ['a', 'b', None]])
# ...by default you have to choose how to do this or we error
na.Array(batch_with_nulls).to_columns_pysequence()
#> ValueError: Null present with null_handler=nulls_forbid()
```
This will basically get you data frame conversion:
```python
import nanoarrow as na
import pandas as pd
url =
"https://github.com/apache/arrow-experiments/raw/main/data/arrow-commits/arrow-commits.arrows"
names, data =
na.ArrayStream.from_url(url).to_columns_pysequence(handle_nulls=na.nulls_as_sentinel())
pd.DataFrame({k: v for k, v in zip(names, data)})
#> commit
time \
#> 0 49cdb0fe4e98fda19031c864a18e6156c6edbf3c 2024-03-07 02:00:52+00:00
#> 1 1d966e98e41ce817d1f8c5159c0b9caa4de75816 2024-03-06 21:51:34+00:00
#> 2 96f26a89bd73997f7532643cdb27d04b70971530 2024-03-06 20:29:15+00:00
#> 3 ee1a8c39a55f3543a82fed900dadca791f6e9f88 2024-03-06 07:46:45+00:00
#> 4 3d467ac7bfae03cf2db09807054c5672e1959aec 2024-03-05 16:13:32+00:00
#> ... ... ...
#> 15482 23c4b08d154f8079806a1f0258d7e4af17bdf5fd 2016-02-17 12:39:03+00:00
#> 15483 16e44e3d456219c48595142d0a6814c9c950d30c 2016-02-17 12:38:39+00:00
#> 15484 fa5f0299f046c46e1b2f671e5e3b4f1956522711 2016-02-17 12:38:39+00:00
#> 15485 cbc56bf8ac423c585c782d5eda5c517ea8df8e3c 2016-02-17 12:38:39+00:00
#> 15486 d5aa7c46692474376a3c31704cfc4783c86338f2 2016-02-05 20:08:35+00:00
#>
#> files merge message
#> 0 2 False GH-40370: [C++] Define ARROW_FORCE_INLINE for ...
#> 1 1 False GH-40386: [Python] Fix except clauses (#40387)
#> 2 1 False GH-40227: [R] ensure executable files in `crea...
#> 3 1 False GH-40366: [C++] Remove const qualifier from Bu...
#> 4 1 False GH-20127: [Python][CI] Remove legacy hdfs test...
#> ... ... ... ...
#> 15482 73 False ARROW-4: This provides an partial C++11 implem...
#> 15483 8 False ARROW-3: This patch includes a WIP draft speci...
#> 15484 124 False ARROW-1: Initial Arrow Code Commit
#> 15485 2 False Update readme and add license in root.
#> 15486 1 False Initial Commit
#>
#> [15487 rows x 5 columns]
```
---------
Co-authored-by: Joris Van den Bossche <[email protected]>
---
python/src/nanoarrow/__init__.py | 4 +
python/src/nanoarrow/_lib.pyx | 38 +++-
python/src/nanoarrow/array.py | 42 +---
python/src/nanoarrow/array_stream.py | 43 +---
python/src/nanoarrow/iterator.py | 6 +-
python/src/nanoarrow/visitor.py | 400 +++++++++++++++++++++++++++++------
python/tests/test_array.py | 5 +-
python/tests/test_array_stream.py | 6 +-
python/tests/test_c_buffer.py | 14 +-
python/tests/test_nanoarrow.py | 42 ++++
python/tests/test_visitor.py | 150 ++++++++++++-
11 files changed, 583 insertions(+), 167 deletions(-)
diff --git a/python/src/nanoarrow/__init__.py b/python/src/nanoarrow/__init__.py
index 1dc5c6f2..b021bbe5 100644
--- a/python/src/nanoarrow/__init__.py
+++ b/python/src/nanoarrow/__init__.py
@@ -72,6 +72,7 @@ from nanoarrow.schema import (
)
from nanoarrow.array import array, Array
from nanoarrow.array_stream import ArrayStream
+from nanoarrow.visitor import nulls_as_sentinel, nulls_forbid, nulls_separate
from nanoarrow._version import __version__ # noqa: F401
# Helps Sphinx automatically populate an API reference section
@@ -113,6 +114,9 @@ __all__ = [
"large_list",
"list_",
"null",
+ "nulls_as_sentinel",
+ "nulls_forbid",
+ "nulls_separate",
"string",
"struct",
"schema",
diff --git a/python/src/nanoarrow/_lib.pyx b/python/src/nanoarrow/_lib.pyx
index 539135aa..0b2fd7b2 100644
--- a/python/src/nanoarrow/_lib.pyx
+++ b/python/src/nanoarrow/_lib.pyx
@@ -1009,6 +1009,11 @@ cdef class CSchemaView:
if self.extension_name or self._schema_view.type !=
self._schema_view.storage_type:
return None
+ # String/binary types do not have format strings as far as the Python
+ # buffer protocol is concerned
+ if self.layout.n_buffers != 2:
+ return None
+
cdef char out[128]
cdef int element_size_bits = 0
if self._schema_view.type == NANOARROW_TYPE_FIXED_SIZE_BINARY:
@@ -1632,6 +1637,22 @@ cdef class CArrayView:
@property
def null_count(self):
+ if self._ptr.null_count != -1:
+ return self._ptr.null_count
+
+ cdef ArrowBufferType buffer_type = self._ptr.layout.buffer_type[0]
+ cdef uint8_t* validity_bits = self._ptr.buffer_views[0].data.as_uint8
+
+ if buffer_type != NANOARROW_BUFFER_TYPE_VALIDITY:
+ self._ptr.null_count = 0
+ elif validity_bits == NULL:
+ self._ptr.null_count = 0
+ elif self._device is DEVICE_CPU:
+ self._ptr.null_count = (
+ self._ptr.length -
+ ArrowBitCountSet(validity_bits, self.offset, self.length)
+ )
+
return self._ptr.null_count
@property
@@ -1869,7 +1890,7 @@ cdef class CBufferView:
return self._format.decode("UTF-8")
@property
- def item_size(self):
+ def itemsize(self):
return self._strides
def __len__(self):
@@ -1957,7 +1978,7 @@ cdef class CBufferView:
cdef int64_t c_offset = offset
cdef int64_t c_length = length
- cdef int64_t c_item_size = self.item_size
+ cdef int64_t c_item_size = self.itemsize
cdef int64_t c_dest_offset = dest_offset
self._check_copy_into_bounds(&buffer, c_offset, c_length, dest_offset,
c_item_size)
@@ -2010,7 +2031,7 @@ cdef class CBufferView:
if length is None:
length = self.n_elements
- cdef int64_t bytes_to_copy = length * self.item_size
+ cdef int64_t bytes_to_copy = length * self.itemsize
out = CBufferBuilder().set_data_type(self.data_type_id)
out.reserve_bytes(bytes_to_copy)
self.copy_into(out, offset, length)
@@ -2224,9 +2245,9 @@ cdef class CBuffer:
return self._element_size_bits
@property
- def item_size(self):
+ def itemsize(self):
self._assert_valid()
- return self._view.item_size
+ return self._view.itemsize
@property
def format(self):
@@ -2339,6 +2360,13 @@ cdef class CBufferBuilder:
"""The number of bytes that have been written to this buffer"""
return self._buffer.size_bytes
+ @property
+ def itemsize(self):
+ return self._buffer.itemsize
+
+ def __len__(self):
+ return self._buffer.size_bytes // self.itemsize
+
@property
def capacity_bytes(self):
"""The number of bytes allocated in the underlying buffer"""
diff --git a/python/src/nanoarrow/array.py b/python/src/nanoarrow/array.py
index 5cfa65b9..9a4361c7 100644
--- a/python/src/nanoarrow/array.py
+++ b/python/src/nanoarrow/array.py
@@ -17,7 +17,7 @@
import itertools
from functools import cached_property
-from typing import Iterable, List, Sequence, Tuple
+from typing import Iterable, Tuple
from nanoarrow._lib import (
DEVICE_CPU,
@@ -32,7 +32,7 @@ from nanoarrow.c_array_stream import c_array_stream
from nanoarrow.c_schema import c_schema
from nanoarrow.iterator import iter_array_views, iter_py, iter_tuples
from nanoarrow.schema import Schema, _schema_repr
-from nanoarrow.visitor import to_columns, to_pylist
+from nanoarrow.visitor import ArrayViewVisitable
from nanoarrow import _repr_utils
@@ -106,7 +106,7 @@ class Scalar:
return array.__arrow_c_array__(requested_schema=requested_schema)
-class Array:
+class Array(ArrayViewVisitable):
"""High-level in-memory Array representation
The Array is nanoarrow's high-level in-memory array representation whose
@@ -345,42 +345,6 @@ class Array:
"""
return iter_array_views(self)
- def to_pylist(self) -> List:
- """Convert this Array to a ``list()` of Python objects
-
- Computes an identical value to list(:meth:`iter_py`) but can be several
- times faster.
-
- Examples
- --------
-
- >>> import nanoarrow as na
- >>> array = na.Array([1, 2, 3], na.int32())
- >>> array.to_pylist()
- [1, 2, 3]
- """
- return to_pylist(self)
-
- def to_columns(self) -> Tuple[str, Sequence]:
- """Convert this Array to a ``list()` of sequences
-
- Converts a stream of struct arrays into its column-wise representation
- such that each column is either a contiguous buffer or a ``list()``.
-
- Examples
- --------
-
- >>> import nanoarrow as na
- >>> import pyarrow as pa
- >>> array = na.Array(pa.record_batch([pa.array([1, 2, 3])],
names=["col1"]))
- >>> names, columns = array.to_columns()
- >>> names
- ['col1']
- >>> columns
- [[1, 2, 3]]
- """
- return to_columns(self)
-
@property
def n_children(self) -> int:
"""Get the number of children for an Array of this type.
diff --git a/python/src/nanoarrow/array_stream.py
b/python/src/nanoarrow/array_stream.py
index ccd4669c..88f70211 100644
--- a/python/src/nanoarrow/array_stream.py
+++ b/python/src/nanoarrow/array_stream.py
@@ -16,7 +16,7 @@
# under the License.
from functools import cached_property
-from typing import Iterable, List, Sequence, Tuple
+from typing import Iterable, Tuple
from nanoarrow._lib import CMaterializedArrayStream
from nanoarrow._repr_utils import make_class_label
@@ -24,10 +24,10 @@ from nanoarrow.array import Array
from nanoarrow.c_array_stream import c_array_stream
from nanoarrow.iterator import iter_py, iter_tuples
from nanoarrow.schema import Schema, _schema_repr
-from nanoarrow.visitor import to_columns, to_pylist
+from nanoarrow.visitor import ArrayViewVisitable
-class ArrayStream:
+class ArrayStream(ArrayViewVisitable):
"""High-level ArrayStream representation
The ArrayStream is nanoarrow's high-level representation of zero
@@ -199,43 +199,6 @@ class ArrayStream:
"""
return iter_tuples(self)
- def to_pylist(self) -> List:
- """Convert this Array to a ``list()` of Python objects
-
- Computes an identical value to list(:meth:`iter_py`) but can be several
- times faster.
-
- Examples
- --------
-
- >>> import nanoarrow as na
- >>> stream = na.ArrayStream([1, 2, 3], na.int32())
- >>> stream.to_pylist()
- [1, 2, 3]
- """
- return to_pylist(self)
-
- def to_columns(self) -> Tuple[str, Sequence]:
- """Convert this Array to a ``list()` of sequences
-
- Converts a stream of struct arrays into its column-wise representation
- such that each column is either a contiguous buffer or a ``list()``.
-
- Examples
- --------
-
- >>> import nanoarrow as na
- >>> import pyarrow as pa
- >>> batch = pa.record_batch([pa.array([1, 2, 3])], names=["col1"])
- >>> stream = na.ArrayStream(batch)
- >>> names, columns = stream.to_columns()
- >>> names
- ['col1']
- >>> columns
- [[1, 2, 3]]
- """
- return to_columns(self)
-
def __repr__(self) -> str:
cls = make_class_label(self, "nanoarrow")
schema_repr = _schema_repr(self.schema, prefix="",
include_metadata=False)
diff --git a/python/src/nanoarrow/iterator.py b/python/src/nanoarrow/iterator.py
index 3ff1714f..c1cab2e0 100644
--- a/python/src/nanoarrow/iterator.py
+++ b/python/src/nanoarrow/iterator.py
@@ -156,11 +156,7 @@ class ArrayViewBaseIterator:
return f"<unnamed {self._schema_view.type}>"
def _contains_nulls(self):
- return (
- self._schema_view.nullable
- and len(self._array_view.buffer(0))
- and self._array_view.null_count != 0
- )
+ return self._schema_view.nullable and self._array_view.null_count != 0
def _set_array(self, array):
self._array_view._set_array(array)
diff --git a/python/src/nanoarrow/visitor.py b/python/src/nanoarrow/visitor.py
index 53bc0691..0b0d76d7 100644
--- a/python/src/nanoarrow/visitor.py
+++ b/python/src/nanoarrow/visitor.py
@@ -15,76 +15,201 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Any, List, Sequence, Tuple, Union
+from typing import Any, Callable, List, Sequence, Tuple, Union
-from nanoarrow._lib import CArrayView
+from nanoarrow._lib import CArrayView, CArrowType, CBuffer, CBufferBuilder
from nanoarrow.c_array_stream import c_array_stream
+from nanoarrow.c_schema import c_schema_view
from nanoarrow.iterator import ArrayViewBaseIterator, PyIterator
from nanoarrow.schema import Type
-def to_pylist(obj, schema=None) -> List:
- """Convert ``obj`` to a ``list()` of Python objects
+class ArrayViewVisitable:
+ """Mixin class providing conversion methods based on visitors
- Computes an identical value to ``list(iterator.iter_py())`` but is several
- times faster.
+ Can be used with classes that implement ``__arrow_c_stream__()``
+ or ``__arrow_c_array__()``.
+ """
+
+ def to_pylist(self) -> List:
+ """Convert to a ``list`` of Python objects
+
+ Computes an identical value to ``list(iter_py())`` but can be much
+ faster.
+
+ Examples
+ --------
+ >>> import nanoarrow as na
+ >>> from nanoarrow import visitor
+ >>> array = na.Array([1, 2, 3], na.int32())
+ >>> array.to_pylist()
+ [1, 2, 3]
+ """
+ return ToPyListConverter.visit(self)
+
+ def to_columns_pysequence(
+ self, *, handle_nulls=None
+ ) -> Tuple[List[str], List[Sequence]]:
+ """Convert to a ``list`` of contiguous sequences
+
+ Experimentally converts a stream of struct arrays into a list of
contiguous
+ sequences using the same logic as :meth:`to_pysequence`.
+
+ Paramters
+ ---------
+ handle_nulls : callable
+ A function returning a sequence based on a validity bytemap and a
+ contiguous buffer of values. If the array contains no nulls, the
+ validity bytemap will be ``None``. Built-in handlers include
+ :func:`nulls_as_sentinel`, :func:`nulls_forbid`, and
+ :func:`nulls_separate`). The default value is :func:`nulls_forbid`.
+
+ Examples
+ --------
+ >>> import nanoarrow as na
+ >>> import pyarrow as pa
+ >>> batch = pa.record_batch({"col1": [1, 2, 3], "col2": ["a", "b",
"c"]})
+ >>> names, columns = na.Array(batch).to_columns_pysequence()
+ >>> names
+ ['col1', 'col2']
+ >>> columns
+ [nanoarrow.c_lib.CBuffer(int64[24 b] 1 2 3), ['a', 'b', 'c']]
+ """
+ return ToColumnsPysequenceConverter.visit(self,
handle_nulls=handle_nulls)
+
+ def to_pysequence(self, *, handle_nulls=None) -> Sequence:
+ """Convert to a contiguous sequence
+
+ Experimentally converts a stream of arrays into a columnar
representation
+ such that each column is either a contiguous buffer or a ``list``.
+ Integer, float, and interval arrays are currently converted to their
+ contiguous buffer representation; other types are returned as a list
+ of Python objects. The sequences returned by :meth:`to_pysequence` are
+ designed to work as input to ``pandas.Series`` and/or
``numpy.array()``.
+ The default conversions are subject to change based on initial user
+ feedback.
+
+ Parameters
+ ----------
+ handle_nulls : callable
+ A function returning a sequence based on a validity bytemap and a
+ contiguous buffer of values. If the array contains no nulls, the
+ validity bytemap will be ``None``. Built-in handlers include
+ :func:`nulls_as_sentinel`, :func:`nulls_forbid`, and
+ :func:`nulls_separate`). The default value is :func:`nulls_forbid`.
+
+ Examples
+ --------
+ >>> import nanoarrow as na
+ >>> na.Array([1, 2, 3], na.int32()).to_pysequence()
+ nanoarrow.c_lib.CBuffer(int32[12 b] 1 2 3)
+ """
+ return ToPySequenceConverter.visit(self, handle_nulls=handle_nulls)
+
+
+def nulls_forbid() -> Callable[[CBuffer, Sequence], Sequence]:
+ """Erroring null handler
+
+ A null handler that errors when it encounters nulls.
+
+ Examples
+ --------
+
+ >>> import nanoarrow as na
+ >>> na.Array([1, 2, 3],
na.int32()).to_pysequence(handle_nulls=na.nulls_forbid())
+ nanoarrow.c_lib.CBuffer(int32[12 b] 1 2 3)
+ >>> na.Array([1, None, 3],
na.int32()).to_pysequence(handle_nulls=na.nulls_forbid())
+ Traceback (most recent call last):
+ ...
+ ValueError: Null present with null_handler=nulls_forbid()
+ """
+
+ def handle(is_valid, data):
+ # the is_valid bytemap is only created if there was at least one null
+ if is_valid is not None:
+ raise ValueError("Null present with null_handler=nulls_forbid()")
+
+ return data
+
+ return handle
+
+
+def nulls_as_sentinel(sentinel=None):
+ """Sentinel null handler
+
+ A null handler that assigns a sentinel to null values. This is
+ done using numpy using the expression ``data[~is_valid] = sentinel``.
+ The default sentinel value of ``None`` will result in float output and
``nan``
+ assigned to null values for numeric and boolean inputs. This
+ corresponds to numpy's handling of ``None`` in ``np.result_type()``
+ and ``result[~is_valid] = None``.
- Paramters
- ---------
- obj : array stream-like
- An array-like or array stream-like object as sanitized by
- :func:`c_array_stream`.
- schema : schema-like, optional
- An optional schema, passed to :func:`c_array_stream`.
+ Parameters
+ ----------
+ sentinel : scalar, optional
+ The value with which nulls should be replaced.
Examples
--------
>>> import nanoarrow as na
- >>> from nanoarrow import visitor
- >>> array = na.c_array([1, 2, 3], na.int32())
- >>> visitor.to_pylist(array)
- [1, 2, 3]
+ >>> na_array = na.Array([1, 2, 3], na.int32())
+ >>> na_array.to_pysequence(handle_nulls=na.nulls_as_sentinel())
+ array([1, 2, 3], dtype=int32)
+ >>> na_array = na.Array([1, None, 3], na.int32())
+ >>> na_array.to_pysequence(handle_nulls=na.nulls_as_sentinel())
+ array([ 1., nan, 3.])
+ >>> na_array.to_pysequence(handle_nulls=na.nulls_as_sentinel(-999))
+ array([ 1, -999, 3], dtype=int32)
"""
- return ListBuilder.visit(obj, schema)
+ import numpy as np
+ def handle(is_valid, data):
+ data = np.array(data, copy=False)
-def to_columns(obj, schema=None) -> Tuple[List[str], List[Sequence]]:
- """Convert ``obj`` to a ``list()` of sequences
+ if is_valid is not None:
+ is_valid = np.array(is_valid, copy=False)
+ out_type = np.result_type(data, sentinel)
+ data = np.array(data, dtype=out_type, copy=True)
+ data[~is_valid] = sentinel
+ return data
+ else:
+ return data
- Converts a stream of struct arrays into its column-wise representation
- such that each column is either a contiguous buffer or a ``list()``.
+ return handle
- Paramters
- ---------
- obj : array stream-like
- An array-like or array stream-like object as sanitized by
- :func:`c_array_stream`.
- schema : schema-like, optional
- An optional schema, passed to :func:`c_array_stream`.
+
+def nulls_separate() -> Callable[[CBuffer, Sequence], Tuple[CBuffer,
Sequence]]:
+ """Return nulls as a tuple of is_valid, data
+
+ A null handler that returns its components.
Examples
--------
>>> import nanoarrow as na
- >>> from nanoarrow import visitor
- >>> import pyarrow as pa
- >>> array = pa.record_batch([pa.array([1, 2, 3])], names=["col1"])
- >>> names, columns = visitor.to_columns(array)
- >>> names
- ['col1']
- >>> columns
- [[1, 2, 3]]
+ >>> na_array = na.Array([1, 2, 3], na.int32())
+ >>> na_array.to_pysequence(handle_nulls=na.nulls_separate())
+ (None, nanoarrow.c_lib.CBuffer(int32[12 b] 1 2 3))
+ >>> na_array = na.Array([1, None, 3], na.int32())
+ >>> result = na_array.to_pysequence(handle_nulls=na.nulls_separate())
+ >>> result[0]
+ nanoarrow.c_lib.CBuffer(uint8[3 b] True False True)
+ >>> result[1]
+ nanoarrow.c_lib.CBuffer(int32[12 b] 1 0 3)
"""
- return ColumnsBuilder.visit(obj, schema)
+ def handle(is_valid, data):
+ return is_valid, data
+
+ return handle
-class ArrayStreamVisitor(ArrayViewBaseIterator):
- """Compute a value from one or more arrays in an ArrowArrayStream
+
+class ArrayViewVisitor(ArrayViewBaseIterator):
+ """Compute a value from one or more arrays as ArrowArrayViews
This class supports a (currently internal) pattern for building
output from a zero or more arrays in a stream.
-
"""
@classmethod
@@ -124,31 +249,28 @@ class ArrayStreamVisitor(ArrayViewBaseIterator):
return None
-class ListBuilder(ArrayStreamVisitor):
- def __init__(self, schema, *, iterator_cls=PyIterator, array_view=None):
+class ToPySequenceConverter(ArrayViewVisitor):
+ def __init__(self, schema, handle_nulls=None, *, array_view=None):
super().__init__(schema, array_view=array_view)
-
- # Ensure that self._iterator._array_view is self._array_view
- self._iterator = iterator_cls(schema, array_view=self._array_view)
+ cls, kwargs = _resolve_converter_cls(self._schema,
handle_nulls=handle_nulls)
+ self._visitor = cls(schema, **kwargs, array_view=self._array_view)
def begin(self, total_elements: Union[int, None] = None):
- self._lst = []
+ self._visitor.begin(total_elements)
- def visit_chunk_view(self, array_view: CArrayView):
- # The constructor here ensured that self._iterator._array_view
- # is populated when self._set_array() is called.
- self._lst.extend(self._iterator)
+ def visit_chunk_view(self, array_view: CArrayView) -> None:
+ self._visitor.visit_chunk_view(array_view)
- def finish(self) -> List:
- return self._lst
+ def finish(self) -> Any:
+ return self._visitor.finish()
-class ColumnsBuilder(ArrayStreamVisitor):
- def __init__(self, schema, *, array_view=None):
+class ToColumnsPysequenceConverter(ArrayViewVisitor):
+ def __init__(self, schema, handle_nulls=None, *, array_view=None):
super().__init__(schema, array_view=array_view)
if self.schema.type != Type.STRUCT:
- raise ValueError("ColumnsBuilder can only be used on a struct
array")
+ raise ValueError("ToColumnListConverter can only be used on a
struct array")
# Resolve the appropriate visitor for each column
self._child_visitors = []
@@ -156,24 +278,180 @@ class ColumnsBuilder(ArrayStreamVisitor):
self._schema.children, self._array_view.children
):
self._child_visitors.append(
- self._resolve_child_visitor(child_schema, child_array_view)
+ self._resolve_child_visitor(
+ child_schema, child_array_view, handle_nulls
+ )
)
- def _resolve_child_visitor(self, child_schema, child_array_view):
- # TODO: Resolve more efficient column builders for single-buffer types
- return ListBuilder(child_schema, array_view=child_array_view)
+ def _resolve_child_visitor(self, child_schema, child_array_view,
handle_nulls):
+ cls, kwargs = _resolve_converter_cls(child_schema, handle_nulls)
+ return cls(child_schema, **kwargs, array_view=child_array_view)
def begin(self, total_elements: Union[int, None] = None) -> None:
for child_visitor in self._child_visitors:
child_visitor.begin(total_elements)
def visit_chunk_view(self, array_view: CArrayView) -> Any:
+ # This visitor does not handle nulls because it has no way to
propagate these
+ # into the child columns. It is designed to be used on top-level
record batch
+ # arrays which typically are marked as non-nullable or do not contain
nulls.
+ if array_view.null_count > 0:
+ raise ValueError("null_count > 0 encountered in
ToColumnListConverter")
+
for child_visitor, child_array_view in zip(
self._child_visitors, array_view.children
):
child_visitor.visit_chunk_view(child_array_view)
def finish(self) -> Tuple[List[str], List[Sequence]]:
- return [v.schema.name for v in self._child_visitors], [
+ return [child.name for child in self._schema.children], [
v.finish() for v in self._child_visitors
]
+
+
+class ToPyListConverter(ArrayViewVisitor):
+ def __init__(self, schema, *, iterator_cls=PyIterator, array_view=None):
+ super().__init__(schema, array_view=array_view)
+
+ # Ensure that self._iterator._array_view is self._array_view
+ self._iterator = iterator_cls(schema, array_view=self._array_view)
+
+ def begin(self, total_elements: Union[int, None] = None):
+ self._lst = []
+
+ def visit_chunk_view(self, array_view: CArrayView):
+ # The constructor here ensured that self._iterator._array_view
+ # is populated when self._set_array() is called.
+ self._lst.extend(self._iterator)
+
+ def finish(self) -> List:
+ return self._lst
+
+
+class ToPyBufferConverter(ArrayViewVisitor):
+ def begin(self, total_elements: Union[int, None]):
+ self._builder = CBufferBuilder()
+ self._builder.set_format(self._schema_view.buffer_format)
+
+ if total_elements is not None:
+ element_size_bits = self._schema_view.layout.element_size_bits[1]
+ element_size_bytes = element_size_bits // 8
+ self._builder.reserve_bytes(total_elements * element_size_bytes)
+
+ def visit_chunk_view(self, array_view: CArrayView) -> None:
+ converter = self._builder
+ offset, length = array_view.offset, array_view.length
+ dst_bytes = length * converter.itemsize
+
+ converter.reserve_bytes(dst_bytes)
+ array_view.buffer(1).copy_into(converter, offset, length,
len(converter))
+ converter.advance(dst_bytes)
+
+ def finish(self) -> Any:
+ return self._builder.finish()
+
+
+class ToBooleanBufferConverter(ArrayViewVisitor):
+ def begin(self, total_elements: Union[int, None]):
+ self._builder = CBufferBuilder()
+ self._builder.set_format("?")
+
+ if total_elements is not None:
+ self._builder.reserve_bytes(total_elements)
+
+ def visit_chunk_view(self, array_view: CArrayView) -> None:
+ converter = self._builder
+ offset, length = array_view.offset, array_view.length
+ converter.reserve_bytes(length)
+ array_view.buffer(1).unpack_bits_into(converter, offset, length,
len(converter))
+ converter.advance(length)
+
+ def finish(self) -> Any:
+ return self._builder.finish()
+
+
+class ToNullableSequenceConverter(ArrayViewVisitor):
+ def __init__(
+ self,
+ schema,
+ converter_cls=ToPyBufferConverter,
+ handle_nulls: Union[Callable[[CBuffer, Sequence], Any], None] = None,
+ *,
+ array_view=None
+ ):
+ super().__init__(schema, array_view=array_view)
+ self._converter = converter_cls(schema, array_view=self._array_view)
+
+ if handle_nulls is None:
+ self._handle_nulls = nulls_forbid()
+ else:
+ self._handle_nulls = handle_nulls
+
+ def begin(self, total_elements: Union[int, None]):
+ self._builder = CBufferBuilder()
+ self._builder.set_format("?")
+ self._length = 0
+
+ self._converter.begin(total_elements)
+
+ def visit_chunk_view(self, array_view: CArrayView) -> None:
+ offset, length = array_view.offset, array_view.length
+
+ builder = self._builder
+ chunk_contains_nulls = array_view.null_count != 0
+ bitmap_allocated = len(builder) > 0
+
+ if chunk_contains_nulls:
+ current_length = self._length
+ if not bitmap_allocated:
+ self._fill_valid(current_length)
+
+ builder.reserve_bytes(length)
+ array_view.buffer(0).unpack_bits_into(
+ builder, offset, length, current_length
+ )
+ builder.advance(length)
+
+ elif bitmap_allocated:
+ self._fill_valid(length)
+
+ self._length += length
+ self._converter.visit_chunk_view(array_view)
+
+ def finish(self) -> Any:
+ is_valid = self._builder.finish()
+ data = self._converter.finish()
+ return self._handle_nulls(is_valid if len(is_valid) > 0 else None,
data)
+
+ def _fill_valid(self, length):
+ builder = self._builder
+ builder.reserve_bytes(length)
+ out_start = len(builder)
+ memoryview(builder)[out_start : out_start + length] = b"\x01" * length
+ builder.advance(length)
+
+
+def _resolve_converter_cls(schema, handle_nulls=None):
+ schema_view = c_schema_view(schema)
+
+ if schema_view.nullable:
+ if schema_view.type_id == CArrowType.BOOL:
+ return ToNullableSequenceConverter, {
+ "converter_cls": ToBooleanBufferConverter,
+ "handle_nulls": handle_nulls,
+ }
+ elif schema_view.buffer_format is not None:
+ return ToNullableSequenceConverter, {
+ "converter_cls": ToPyBufferConverter,
+ "handle_nulls": handle_nulls,
+ }
+ else:
+ return ToPyListConverter, {}
+ else:
+
+ if schema_view.type_id == CArrowType.BOOL:
+ return ToBooleanBufferConverter, {}
+ elif schema_view.buffer_format is not None:
+ return ToPyBufferConverter, {}
+ else:
+ return ToPyListConverter, {}
diff --git a/python/tests/test_array.py b/python/tests/test_array.py
index 55ffe114..8eb1e96a 100644
--- a/python/tests/test_array.py
+++ b/python/tests/test_array.py
@@ -200,6 +200,9 @@ def test_array_chunked():
# Python objects by to_pylist()
assert array.to_pylist() == list(array.iter_py())
+ # Sequence via to_pysequence()
+ assert list(array.to_pysequence()) == [1, 2, 3, 4, 5, 6]
+
with na.c_array_stream(array) as stream:
arrays = list(stream)
assert len(arrays) == 2
@@ -231,7 +234,7 @@ def test_array_children():
assert len(tuples) == 2
assert len(tuples[0]) == 100
- names, columns = array.to_columns()
+ names, columns = array.to_columns_pysequence()
assert names == [f"col{i}" for i in range(100)]
assert all(len(col) == len(array) for col in columns)
diff --git a/python/tests/test_array_stream.py
b/python/tests/test_array_stream.py
index 8a98f60f..5ce0b551 100644
--- a/python/tests/test_array_stream.py
+++ b/python/tests/test_array_stream.py
@@ -80,10 +80,10 @@ def test_array_stream_to_columns():
)
stream = na.ArrayStream(c_array)
- names, columns = stream.to_columns()
+ names, columns = stream.to_columns_pysequence()
assert names == ["col1", "col2"]
- assert columns[0] == [1, 2, 3]
- assert columns[1] == ["a", "b", "c"]
+ assert list(columns[0]) == [1, 2, 3]
+ assert list(columns[1]) == ["a", "b", "c"]
def test_array_stream_read_all():
diff --git a/python/tests/test_c_buffer.py b/python/tests/test_c_buffer.py
index d18435cc..c84d866e 100644
--- a/python/tests/test_c_buffer.py
+++ b/python/tests/test_c_buffer.py
@@ -252,7 +252,7 @@ def test_c_buffer_from_iterable():
assert buffer.size_bytes == 12
assert buffer.data_type == "int32"
assert buffer.element_size_bits == 32
- assert buffer.item_size == 4
+ assert buffer.itemsize == 4
assert list(buffer) == [1, 2, 3]
# An Arrow type that does not make sense as a buffer type will error
@@ -273,7 +273,7 @@ def test_c_buffer_from_fixed_size_binary_iterable():
buffer = na.c_buffer(items, na.fixed_size_binary(4))
assert buffer.data_type == "binary"
assert buffer.element_size_bits == 32
- assert buffer.item_size == 4
+ assert buffer.itemsize == 4
assert bytes(buffer) == b"".join(items)
assert list(buffer) == items
@@ -282,7 +282,7 @@ def test_c_buffer_from_day_time_iterable():
buffer = na.c_buffer([(1, 2), (3, 4), (5, 6)], na.interval_day_time())
assert buffer.data_type == "interval_day_time"
assert buffer.element_size_bits == 64
- assert buffer.item_size == 8
+ assert buffer.itemsize == 8
assert list(buffer) == [(1, 2), (3, 4), (5, 6)]
@@ -290,7 +290,7 @@ def test_c_buffer_from_month_day_nano_iterable():
buffer = na.c_buffer([(1, 2, 3), (4, 5, 6)], na.interval_month_day_nano())
assert buffer.data_type == "interval_month_day_nano"
assert buffer.element_size_bits == 128
- assert buffer.item_size == 16
+ assert buffer.itemsize == 16
assert list(buffer) == [(1, 2, 3), (4, 5, 6)]
@@ -302,7 +302,7 @@ def test_c_buffer_from_decimal128_iterable():
)
assert buffer.data_type == "decimal128"
assert buffer.element_size_bits == 128
- assert buffer.item_size == 16
+ assert buffer.itemsize == 16
assert list(buffer) == [
bytes64[0:16],
bytes64[16:32],
@@ -316,7 +316,7 @@ def test_c_buffer_from_decimal256_iterable():
buffer = na.c_buffer([bytes64[0:32], bytes64[32:64]], na.decimal256(10, 3))
assert buffer.data_type == "decimal256"
assert buffer.element_size_bits == 256
- assert buffer.item_size == 32
+ assert buffer.itemsize == 32
assert list(buffer) == [bytes64[0:32], bytes64[32:64]]
@@ -326,7 +326,7 @@ def test_c_buffer_bitmap_from_iterable():
assert "10010000" in repr(buffer)
assert buffer.size_bytes == 1
assert buffer.data_type == "bool"
- assert buffer.item_size == 1
+ assert buffer.itemsize == 1
assert buffer.element_size_bits == 1
assert list(buffer.elements()) == [
True,
diff --git a/python/tests/test_nanoarrow.py b/python/tests/test_nanoarrow.py
index 2fb805bd..41f2e1bd 100644
--- a/python/tests/test_nanoarrow.py
+++ b/python/tests/test_nanoarrow.py
@@ -183,6 +183,48 @@ def test_c_array_view_dictionary():
assert "- dictionary: <nanoarrow.c_array.CArrayView>" in repr(view)
+def test_c_array_view_null_count():
+ # With explicit null count == 0
+ array = na.c_array_from_buffers(
+ na.int32(), 3, (None, na.c_buffer([1, 2, 3], na.int32())), null_count=0
+ )
+ assert array.view().null_count == 0
+
+ # Infer null count == 0 because of null data buffer when the null count
+ # has not yet been computed by the producer.
+ array = na.c_array_from_buffers(
+ na.int32(), 3, (None, na.c_buffer([1, 2, 3], na.int32())),
null_count=-1
+ )
+ assert array.view().null_count == 0
+
+ # Compute null count == 0 by counting validity bits when the null count
+ # has not yet been computed by the producer.
+ array = na.c_array_from_buffers(
+ na.int32(),
+ 3,
+ (
+ na.c_buffer([True, True, True], na.bool_()),
+ na.c_buffer([1, 2, 3], na.int32()),
+ ),
+ null_count=-1,
+ )
+
+ assert array.view().null_count == 0
+
+ # Check computed null count with actual nulls when the null count
+ # has not yet been computed by the producer.
+ array = na.c_array_from_buffers(
+ na.int32(),
+ 3,
+ (
+ na.c_buffer([True, False, True], na.bool_()),
+ na.c_buffer([1, 2, 3], na.int32()),
+ ),
+ null_count=-1,
+ )
+ assert array.view().null_count == 1
+
+
def test_buffers_integer():
data_types = [
(pa.uint8(), np.uint8()),
diff --git a/python/tests/test_visitor.py b/python/tests/test_visitor.py
index 39c3c4d0..eb1c1f65 100644
--- a/python/tests/test_visitor.py
+++ b/python/tests/test_visitor.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import pytest
+from nanoarrow.c_buffer import CBuffer
import nanoarrow as na
from nanoarrow import visitor
@@ -22,10 +23,50 @@ from nanoarrow import visitor
def test_to_pylist():
array = na.c_array([1, 2, 3], na.int32())
- assert visitor.to_pylist(array) == [1, 2, 3]
+ assert visitor.ToPyListConverter.visit(array) == [1, 2, 3]
-def test_to_columms():
+def test_convert():
+ ints = na.c_array([1, 2, 3], na.int32())
+ bools = na.c_array([1, 0, 1], na.bool_())
+ strings = na.c_array(["abc", "def", "ghi"], na.string())
+
+ ints_col = visitor.ToPySequenceConverter.visit(ints)
+ assert isinstance(ints_col, CBuffer)
+ assert ints_col.format == "i"
+ assert list(ints_col) == [1, 2, 3]
+
+ bools_col = visitor.ToPySequenceConverter.visit(bools)
+ assert isinstance(bools_col, CBuffer)
+ assert bools_col.format == "?"
+ assert list(bools_col) == [True, False, True]
+
+ strings_col = visitor.ToPySequenceConverter.visit(strings)
+ assert isinstance(strings_col, list)
+ assert strings_col == ["abc", "def", "ghi"]
+
+
+def test_convert_non_nullable():
+ ints = na.c_array([1, 2, 3], na.int32(nullable=False))
+ bools = na.c_array([1, 0, 1], na.bool_(nullable=False))
+ strings = na.c_array(["abc", "def", "ghi"], na.string(nullable=False))
+
+ ints_col = visitor.ToPySequenceConverter.visit(ints)
+ assert isinstance(ints_col, CBuffer)
+ assert ints_col.format == "i"
+ assert list(ints_col) == [1, 2, 3]
+
+ bools_col = visitor.ToPySequenceConverter.visit(bools)
+ assert isinstance(bools_col, CBuffer)
+ assert bools_col.format == "?"
+ assert list(bools_col) == [True, False, True]
+
+ strings_col = visitor.ToPySequenceConverter.visit(strings)
+ assert isinstance(strings_col, list)
+ assert strings_col == ["abc", "def", "ghi"]
+
+
+def test_convert_columns():
array = na.c_array_from_buffers(
na.struct({"col1": na.int32(), "col2": na.bool_(), "col3":
na.string()}),
length=3,
@@ -37,11 +78,108 @@ def test_to_columms():
],
)
- names, columns = visitor.to_columns(array)
+ names, columns = visitor.ToColumnsPysequenceConverter.visit(array)
assert names == ["col1", "col2", "col3"]
- assert columns[0] == [1, 2, 3]
- assert columns[1] == [True, False, True]
+ assert list(columns[0]) == [1, 2, 3]
+ assert list(columns[1]) == [True, False, True]
assert columns[2] == ["abc", "def", "ghi"]
with pytest.raises(ValueError, match="can only be used on a struct array"):
- visitor.to_columns([], na.int32())
+ visitor.ToColumnsPysequenceConverter.visit([], na.int32())
+
+ # Ensure that the columns converter errors for top-level nulls
+ array_with_nulls = na.c_array_from_buffers(
+ array.schema,
+ array.length,
+ [na.c_buffer([True, False, True], na.bool_())],
+ children=array.children,
+ )
+ with pytest.raises(ValueError, match="null_count > 0"):
+ visitor.ToColumnsPysequenceConverter.visit(array_with_nulls)
+
+
+def test_contiguous_buffer_converter():
+ array = na.Array.from_chunks([[1, 2, 3], [4, 5, 6]], na.int32())
+ buffer = visitor.ToPyBufferConverter.visit(array)
+ assert list(buffer) == [1, 2, 3, 4, 5, 6]
+
+
+def test_contiguous_buffer_converter_with_offsets():
+ src = [na.c_array([1, 2, 3], na.int32())[1:], na.c_array([4, 5, 6],
na.int32())[2:]]
+ array = na.Array.from_chunks(src)
+ buffer = visitor.ToPyBufferConverter.visit(array)
+ assert list(buffer) == [2, 3, 6]
+
+
+def test_boolean_bytes_converter():
+ array = na.Array.from_chunks([[0, 1, 1], [1, 0, 0]], na.bool_())
+ buffer = visitor.ToBooleanBufferConverter.visit(array)
+ assert list(buffer) == [False, True, True, True, False, False]
+
+
+def test_boolean_bytes_converter_with_offsets():
+ src = [na.c_array([0, 1, 1], na.bool_())[1:], na.c_array([1, 0, 0],
na.bool_())[2:]]
+ array = na.Array.from_chunks(src)
+ buffer = visitor.ToBooleanBufferConverter.visit(array)
+ assert list(buffer) == [True, True, False]
+
+
+def test_nullable_converter():
+ # All valid
+ array = na.Array.from_chunks([[1, 2, 3], [4, 5, 6]], na.int32())
+ is_valid, column = visitor.ToNullableSequenceConverter.visit(
+ array, handle_nulls=na.nulls_separate()
+ )
+ assert is_valid is None
+ assert list(column) == [1, 2, 3, 4, 5, 6]
+
+ # Only nulls in the first chunk
+ array = na.Array.from_chunks([[1, None, 3], [4, 5, 6]], na.int32())
+ is_valid, column = visitor.ToNullableSequenceConverter.visit(
+ array, handle_nulls=na.nulls_separate()
+ )
+ assert list(is_valid) == [True, False, True, True, True, True]
+ assert list(column) == [1, 0, 3, 4, 5, 6]
+
+ # Only nulls in the second chunk
+ array = na.Array.from_chunks([[1, 2, 3], [4, None, 6]], na.int32())
+ is_valid, column = visitor.ToNullableSequenceConverter.visit(
+ array, handle_nulls=na.nulls_separate()
+ )
+ assert list(is_valid) == [True, True, True, True, False, True]
+ assert list(column) == [1, 2, 3, 4, 0, 6]
+
+ # Nulls in both chunks
+ array = na.Array.from_chunks([[1, None, 3], [4, None, 6]], na.int32())
+ is_valid, column = visitor.ToNullableSequenceConverter.visit(
+ array, handle_nulls=na.nulls_separate()
+ )
+ assert list(is_valid) == [True, False, True, True, False, True]
+ assert list(column) == [1, 0, 3, 4, 0, 6]
+
+
+def test_nulls_forbid():
+ is_valid_non_empty = na.c_buffer([1, 0, 1], na.uint8())
+ data = na.c_buffer([1, 2, 3], na.int32())
+
+ forbid_nulls = visitor.nulls_forbid()
+ assert forbid_nulls(None, data) is data
+ with pytest.raises(ValueError):
+ forbid_nulls(is_valid_non_empty, data)
+
+
+def test_numpy_null_handling():
+ np = pytest.importorskip("numpy")
+
+ is_valid_non_empty = memoryview(na.c_buffer([1, 0, 1],
na.uint8())).cast("?")
+ data = na.c_buffer([1, 2, 3], na.int32())
+
+ # Check nulls as sentinel
+ nulls_as_sentinel = visitor.nulls_as_sentinel()
+ np.testing.assert_array_equal(
+ nulls_as_sentinel(None, data), np.array([1, 2, 3], np.int32)
+ )
+ np.testing.assert_array_equal(
+ nulls_as_sentinel(is_valid_non_empty, data),
+ np.array([1, np.nan, 3], dtype=np.float64),
+ )