This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 490b9809 feat(python): Add visitor pattern + builders for column
sequences (#454)
490b9809 is described below
commit 490b9809016941947401a47533f2a2333c42dee4
Author: Dewey Dunnington <[email protected]>
AuthorDate: Mon May 13 11:59:24 2024 -0300
feat(python): Add visitor pattern + builders for column sequences (#454)
Assembling columns from chunked things is rather difficult to do and is
a valid thing that somebody might want to assemble from Arrow data. This
PR adds a "visitor" pattern that can be extended to build "column"s,
which are currently just `list()`s. Before trimming down this PR to a
managable set of changes, I also implemented the visitor that
concatenates data buffers for single data buffer types (
https://gist.github.com/paleolimbot/17263e38b5d97c770e44d33b11181eaf ),
which will be needed for `to_columns()` to be used in any kind of
serious way.
To support the "visitor" pattern, I moved some of the
`PyIterator`-specific pieces into the `PyIterator` so that the visitor
can re-use the relevant pieces of `ArrayViewBaseIterator`. This pattern
also solves one of the problems I had when attempting a "repr" iterator,
which is that I was trying to build something rather than iterate over
it.
```python
import nanoarrow as na
import pandas as pd
from nanoarrow import visitor
url =
"https://github.com/apache/arrow-experiments/raw/main/data/arrow-commits/arrow-commits.arrows"
array = na.ArrayStream.from_url(url).read_all()
# to_columns() doesn't (and won't) produce anything numpy or pandas-related
names, columns = visitor.to_columns(array)
# ..but lets data frames be built rather compactly
pd.DataFrame({k: v for k, v in zip(names, columns)})
```
---
python/src/nanoarrow/array.py | 39 +++++++-
python/src/nanoarrow/array_stream.py | 40 +++++++-
python/src/nanoarrow/iterator.py | 78 ++++++++-------
python/src/nanoarrow/visitor.py | 179 +++++++++++++++++++++++++++++++++++
python/tests/test_array.py | 20 +++-
python/tests/test_array_stream.py | 54 ++++++++++-
python/tests/test_visitor.py | 47 +++++++++
7 files changed, 414 insertions(+), 43 deletions(-)
diff --git a/python/src/nanoarrow/array.py b/python/src/nanoarrow/array.py
index 43ff0dc2..5cfa65b9 100644
--- a/python/src/nanoarrow/array.py
+++ b/python/src/nanoarrow/array.py
@@ -17,7 +17,7 @@
import itertools
from functools import cached_property
-from typing import Iterable, Tuple
+from typing import Iterable, List, Sequence, Tuple
from nanoarrow._lib import (
DEVICE_CPU,
@@ -32,6 +32,7 @@ from nanoarrow.c_array_stream import c_array_stream
from nanoarrow.c_schema import c_schema
from nanoarrow.iterator import iter_array_views, iter_py, iter_tuples
from nanoarrow.schema import Schema, _schema_repr
+from nanoarrow.visitor import to_columns, to_pylist
from nanoarrow import _repr_utils
@@ -344,6 +345,42 @@ class Array:
"""
return iter_array_views(self)
+ def to_pylist(self) -> List:
+ """Convert this Array to a ``list()` of Python objects
+
+ Computes an identical value to list(:meth:`iter_py`) but can be several
+ times faster.
+
+ Examples
+ --------
+
+ >>> import nanoarrow as na
+ >>> array = na.Array([1, 2, 3], na.int32())
+ >>> array.to_pylist()
+ [1, 2, 3]
+ """
+ return to_pylist(self)
+
+ def to_columns(self) -> Tuple[str, Sequence]:
+ """Convert this Array to a ``list()` of sequences
+
+ Converts a stream of struct arrays into its column-wise representation
+ such that each column is either a contiguous buffer or a ``list()``.
+
+ Examples
+ --------
+
+ >>> import nanoarrow as na
+ >>> import pyarrow as pa
+ >>> array = na.Array(pa.record_batch([pa.array([1, 2, 3])],
names=["col1"]))
+ >>> names, columns = array.to_columns()
+ >>> names
+ ['col1']
+ >>> columns
+ [[1, 2, 3]]
+ """
+ return to_columns(self)
+
@property
def n_children(self) -> int:
"""Get the number of children for an Array of this type.
diff --git a/python/src/nanoarrow/array_stream.py
b/python/src/nanoarrow/array_stream.py
index deaaece2..ccd4669c 100644
--- a/python/src/nanoarrow/array_stream.py
+++ b/python/src/nanoarrow/array_stream.py
@@ -16,7 +16,7 @@
# under the License.
from functools import cached_property
-from typing import Iterable, Tuple
+from typing import Iterable, List, Sequence, Tuple
from nanoarrow._lib import CMaterializedArrayStream
from nanoarrow._repr_utils import make_class_label
@@ -24,6 +24,7 @@ from nanoarrow.array import Array
from nanoarrow.c_array_stream import c_array_stream
from nanoarrow.iterator import iter_py, iter_tuples
from nanoarrow.schema import Schema, _schema_repr
+from nanoarrow.visitor import to_columns, to_pylist
class ArrayStream:
@@ -198,6 +199,43 @@ class ArrayStream:
"""
return iter_tuples(self)
+ def to_pylist(self) -> List:
+ """Convert this Array to a ``list()` of Python objects
+
+ Computes an identical value to list(:meth:`iter_py`) but can be several
+ times faster.
+
+ Examples
+ --------
+
+ >>> import nanoarrow as na
+ >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+ >>> stream.to_pylist()
+ [1, 2, 3]
+ """
+ return to_pylist(self)
+
+ def to_columns(self) -> Tuple[str, Sequence]:
+ """Convert this Array to a ``list()` of sequences
+
+ Converts a stream of struct arrays into its column-wise representation
+ such that each column is either a contiguous buffer or a ``list()``.
+
+ Examples
+ --------
+
+ >>> import nanoarrow as na
+ >>> import pyarrow as pa
+ >>> batch = pa.record_batch([pa.array([1, 2, 3])], names=["col1"])
+ >>> stream = na.ArrayStream(batch)
+ >>> names, columns = stream.to_columns()
+ >>> names
+ ['col1']
+ >>> columns
+ [[1, 2, 3]]
+ """
+ return to_columns(self)
+
def __repr__(self) -> str:
cls = make_class_label(self, "nanoarrow")
schema_repr = _schema_repr(self.schema, prefix="",
include_metadata=False)
diff --git a/python/src/nanoarrow/iterator.py b/python/src/nanoarrow/iterator.py
index 5f85724d..2364ea82 100644
--- a/python/src/nanoarrow/iterator.py
+++ b/python/src/nanoarrow/iterator.py
@@ -23,6 +23,7 @@ from typing import Iterable, Tuple
from nanoarrow._lib import CArrayView, CArrowType
from nanoarrow.c_array_stream import c_array_stream
from nanoarrow.c_schema import c_schema, c_schema_view
+from nanoarrow.schema import Schema
def iter_py(obj, schema=None) -> Iterable:
@@ -130,47 +131,22 @@ class UnregisteredExtensionWarning(UserWarning):
class ArrayViewBaseIterator:
- """Base class for iterators that use an internal ArrowArrayView
+ """Base class for iterators and visitors that use an internal
ArrowArrayView
as the basis for conversion to Python objects. Intended for internal use.
"""
- @classmethod
- def get_iterator(cls, obj, schema=None):
- with c_array_stream(obj, schema=schema) as stream:
- iterator = cls(stream._get_cached_schema())
- for array in stream:
- iterator._set_array(array)
- yield from iterator._iter_chunk(0, len(array))
-
- def __init__(self, schema, *, _array_view=None):
+ def __init__(self, schema, *, array_view=None):
self._schema = c_schema(schema)
self._schema_view = c_schema_view(schema)
- if _array_view is None:
+ if array_view is None:
self._array_view = CArrayView.from_schema(self._schema)
else:
- self._array_view = _array_view
-
- self._children = list(
- map(self._make_child, self._schema.children,
self._array_view.children)
- )
-
- if self._schema.dictionary is None:
- self._dictionary = None
- else:
- self._dictionary = self._make_child(
- self._schema.dictionary, self._array_view.dictionary
- )
-
- def _make_child(self, schema, array_view):
- return type(self)(schema, _array_view=array_view)
-
- def _iter_chunk(self, offset, length) -> Iterable:
- yield self._array_view
+ self._array_view = array_view
@cached_property
- def _child_names(self):
- return [child.name for child in self._schema.children]
+ def schema(self) -> Schema:
+ return Schema(self._schema)
@cached_property
def _object_label(self):
@@ -199,7 +175,41 @@ class PyIterator(ArrayViewBaseIterator):
Intended for internal use.
"""
+ @classmethod
+ def get_iterator(cls, obj, schema=None):
+ with c_array_stream(obj, schema=schema) as stream:
+ iterator = cls(stream._get_cached_schema())
+ for array in stream:
+ iterator._set_array(array)
+ yield from iterator
+
+ def __init__(self, schema, *, array_view=None):
+ super().__init__(schema, array_view=array_view)
+
+ self._children = list(
+ map(self._make_child, self._schema.children,
self._array_view.children)
+ )
+
+ if self._schema.dictionary is None:
+ self._dictionary = None
+ else:
+ self._dictionary = self._make_child(
+ self._schema.dictionary, self._array_view.dictionary
+ )
+
+ def _make_child(self, schema, array_view):
+ return type(self)(schema, array_view=array_view)
+
+ @cached_property
+ def _child_names(self):
+ return [child.name for child in self._schema.children]
+
+ def __iter__(self):
+ """Iterate over all elements in the current chunk"""
+ return self._iter_chunk(0, len(self._array_view))
+
def _iter_chunk(self, offset, length):
+ """Iterate over all elements in a slice of the current chunk"""
# Check for an extension type first since this isn't reflected by
# self._schema_view.type_id. Currently we just return the storage
# iterator with a warning for extension types.
@@ -480,8 +490,8 @@ class RowTupleIterator(PyIterator):
Intended for internal use.
"""
- def __init__(self, schema, *, _array_view=None):
- super().__init__(schema, _array_view=_array_view)
+ def __init__(self, schema, *, array_view=None):
+ super().__init__(schema, array_view=array_view)
if self._schema_view.type != "struct":
raise TypeError(
"RowTupleIterator can only iterate over struct arrays "
@@ -489,7 +499,7 @@ class RowTupleIterator(PyIterator):
)
def _make_child(self, schema, array_view):
- return PyIterator(schema, _array_view=array_view)
+ return PyIterator(schema, array_view=array_view)
def _iter_chunk(self, offset, length):
return self._struct_tuple_iter(offset, length)
diff --git a/python/src/nanoarrow/visitor.py b/python/src/nanoarrow/visitor.py
new file mode 100644
index 00000000..53bc0691
--- /dev/null
+++ b/python/src/nanoarrow/visitor.py
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, List, Sequence, Tuple, Union
+
+from nanoarrow._lib import CArrayView
+from nanoarrow.c_array_stream import c_array_stream
+from nanoarrow.iterator import ArrayViewBaseIterator, PyIterator
+from nanoarrow.schema import Type
+
+
+def to_pylist(obj, schema=None) -> List:
+ """Convert ``obj`` to a ``list()` of Python objects
+
+ Computes an identical value to ``list(iterator.iter_py())`` but is several
+ times faster.
+
+ Paramters
+ ---------
+ obj : array stream-like
+ An array-like or array stream-like object as sanitized by
+ :func:`c_array_stream`.
+ schema : schema-like, optional
+ An optional schema, passed to :func:`c_array_stream`.
+
+ Examples
+ --------
+
+ >>> import nanoarrow as na
+ >>> from nanoarrow import visitor
+ >>> array = na.c_array([1, 2, 3], na.int32())
+ >>> visitor.to_pylist(array)
+ [1, 2, 3]
+ """
+ return ListBuilder.visit(obj, schema)
+
+
+def to_columns(obj, schema=None) -> Tuple[List[str], List[Sequence]]:
+ """Convert ``obj`` to a ``list()` of sequences
+
+ Converts a stream of struct arrays into its column-wise representation
+ such that each column is either a contiguous buffer or a ``list()``.
+
+ Paramters
+ ---------
+ obj : array stream-like
+ An array-like or array stream-like object as sanitized by
+ :func:`c_array_stream`.
+ schema : schema-like, optional
+ An optional schema, passed to :func:`c_array_stream`.
+
+ Examples
+ --------
+
+ >>> import nanoarrow as na
+ >>> from nanoarrow import visitor
+ >>> import pyarrow as pa
+ >>> array = pa.record_batch([pa.array([1, 2, 3])], names=["col1"])
+ >>> names, columns = visitor.to_columns(array)
+ >>> names
+ ['col1']
+ >>> columns
+ [[1, 2, 3]]
+ """
+ return ColumnsBuilder.visit(obj, schema)
+
+
+class ArrayStreamVisitor(ArrayViewBaseIterator):
+ """Compute a value from one or more arrays in an ArrowArrayStream
+
+ This class supports a (currently internal) pattern for building
+ output from a zero or more arrays in a stream.
+
+ """
+
+ @classmethod
+ def visit(cls, obj, schema=None, total_elements=None, **kwargs):
+ """Visit all chunks in ``obj`` as a :func:`c_array_stream`."""
+
+ if total_elements is None and hasattr(obj, "__len__"):
+ total_elements = len(obj)
+
+ with c_array_stream(obj, schema=schema) as stream:
+ visitor = cls(stream._get_cached_schema(), **kwargs)
+ visitor.begin(total_elements)
+
+ visitor_set_array = visitor._set_array
+ visit_chunk_view = visitor.visit_chunk_view
+ array_view = visitor._array_view
+
+ for array in stream:
+ visitor_set_array(array)
+ visit_chunk_view(array_view)
+
+ return visitor.finish()
+
+ def begin(self, total_elements: Union[int, None] = None):
+ """Called after the schema has been resolved but before any
+ chunks have been visited. If the total number of elements
+ (i.e., the sum of all chunk lengths) is known, it is provided here.
+ """
+ pass
+
+ def visit_chunk_view(self, array_view: CArrayView) -> None:
+ """Called exactly one for each chunk seen."""
+ pass
+
+ def finish(self) -> Any:
+ """Called exactly once after all chunks have been visited."""
+ return None
+
+
+class ListBuilder(ArrayStreamVisitor):
+ def __init__(self, schema, *, iterator_cls=PyIterator, array_view=None):
+ super().__init__(schema, array_view=array_view)
+
+ # Ensure that self._iterator._array_view is self._array_view
+ self._iterator = iterator_cls(schema, array_view=self._array_view)
+
+ def begin(self, total_elements: Union[int, None] = None):
+ self._lst = []
+
+ def visit_chunk_view(self, array_view: CArrayView):
+ # The constructor here ensured that self._iterator._array_view
+ # is populated when self._set_array() is called.
+ self._lst.extend(self._iterator)
+
+ def finish(self) -> List:
+ return self._lst
+
+
+class ColumnsBuilder(ArrayStreamVisitor):
+ def __init__(self, schema, *, array_view=None):
+ super().__init__(schema, array_view=array_view)
+
+ if self.schema.type != Type.STRUCT:
+ raise ValueError("ColumnsBuilder can only be used on a struct
array")
+
+ # Resolve the appropriate visitor for each column
+ self._child_visitors = []
+ for child_schema, child_array_view in zip(
+ self._schema.children, self._array_view.children
+ ):
+ self._child_visitors.append(
+ self._resolve_child_visitor(child_schema, child_array_view)
+ )
+
+ def _resolve_child_visitor(self, child_schema, child_array_view):
+ # TODO: Resolve more efficient column builders for single-buffer types
+ return ListBuilder(child_schema, array_view=child_array_view)
+
+ def begin(self, total_elements: Union[int, None] = None) -> None:
+ for child_visitor in self._child_visitors:
+ child_visitor.begin(total_elements)
+
+ def visit_chunk_view(self, array_view: CArrayView) -> Any:
+ for child_visitor, child_array_view in zip(
+ self._child_visitors, array_view.children
+ ):
+ child_visitor.visit_chunk_view(child_array_view)
+
+ def finish(self) -> Tuple[List[str], List[Sequence]]:
+ return [v.schema.name for v in self._child_visitors], [
+ v.finish() for v in self._child_visitors
+ ]
diff --git a/python/tests/test_array.py b/python/tests/test_array.py
index a6a79eec..55ffe114 100644
--- a/python/tests/test_array.py
+++ b/python/tests/test_array.py
@@ -48,13 +48,13 @@ def test_array_from_chunks():
array = na.Array.from_chunks([[1, 2, 3], [4, 5, 6]], na.int32())
assert array.schema.type == na.Type.INT32
assert array.n_chunks == 2
- assert list(array.iter_py()) == [1, 2, 3, 4, 5, 6]
+ assert array.to_pylist() == [1, 2, 3, 4, 5, 6]
# Check with schema inferred from first chunk
array = na.Array.from_chunks(array.iter_chunks())
assert array.schema.type == na.Type.INT32
assert array.n_chunks == 2
- assert list(array.iter_py()) == [1, 2, 3, 4, 5, 6]
+ assert array.to_pylist() == [1, 2, 3, 4, 5, 6]
# Check empty
array = na.Array.from_chunks([], na.int32())
@@ -75,7 +75,7 @@ def test_array_from_chunks_validate():
# ...but that one can opt out
array = na.Array.from_chunks(chunks, validate=False)
- assert list(array.iter_py()) == [1, 2, 3, 1, 2, 3]
+ assert array.to_pylist() == [1, 2, 3, 1, 2, 3]
def test_array_empty():
@@ -96,7 +96,7 @@ def test_array_empty():
with pytest.raises(IndexError):
array.chunk(0)
- assert list(array.iter_py()) == []
+ assert array.to_pylist() == []
assert list(array.iter_scalar()) == []
with pytest.raises(IndexError):
array[0]
@@ -148,6 +148,9 @@ def test_array_contiguous():
for py_item, item in zip([1, 2, 3], array.iter_py()):
assert item == py_item
+ # Python objects by to_pylist()
+ assert array.to_pylist() == list(array.iter_py())
+
with na.c_array_stream(array) as stream:
arrays = list(stream)
assert len(arrays) == 1
@@ -194,6 +197,9 @@ def test_array_chunked():
for py_item, item in zip([1, 2, 3], array.iter_py()):
assert item == py_item
+ # Python objects by to_pylist()
+ assert array.to_pylist() == list(array.iter_py())
+
with na.c_array_stream(array) as stream:
arrays = list(stream)
assert len(arrays) == 2
@@ -216,7 +222,7 @@ def test_array_children():
assert array.n_children == 100
assert array.child(0).schema.type == na.Type.INT32
assert array.child(0).n_chunks == 2
- assert list(array.child(0).iter_py()) == [123456, 123456]
+ assert array.child(0).to_pylist() == [123456, 123456]
children = list(array.iter_children())
assert len(children) == array.n_children
@@ -225,6 +231,10 @@ def test_array_children():
assert len(tuples) == 2
assert len(tuples[0]) == 100
+ names, columns = array.to_columns()
+ assert names == [f"col{i}" for i in range(100)]
+ assert all(len(col) == len(array) for col in columns)
+
def test_scalar_to_array():
array = na.Array([123456, 7890], na.int32())
diff --git a/python/tests/test_array_stream.py
b/python/tests/test_array_stream.py
index 035949f0..8a98f60f 100644
--- a/python/tests/test_array_stream.py
+++ b/python/tests/test_array_stream.py
@@ -36,18 +36,68 @@ def test_array_stream_iter():
next(stream_iter)
+def test_array_stream_iter_chunks():
+ stream = na.ArrayStream([1, 2, 3], na.int32())
+ chunks = list(stream.iter_chunks())
+ assert len(chunks) == 1
+ assert chunks[0].to_pylist() == [1, 2, 3]
+
+
+def test_array_stream_iter_py():
+ stream = na.ArrayStream([1, 2, 3], na.int32())
+ assert list(stream.iter_py()) == [1, 2, 3]
+
+
+def test_array_stream_iter_tuples():
+ c_array = na.c_array_from_buffers(
+ na.struct({"col1": na.int32(), "col2": na.string()}),
+ length=3,
+ buffers=[None],
+ children=[
+ na.c_array([1, 2, 3], na.int32()),
+ na.c_array(["a", "b", "c"], na.string()),
+ ],
+ )
+
+ stream = na.ArrayStream(c_array)
+ assert list(stream.iter_tuples()) == [(1, "a"), (2, "b"), (3, "c")]
+
+
+def test_array_stream_to_pylist():
+ stream = na.ArrayStream([1, 2, 3], na.int32())
+ assert stream.to_pylist() == [1, 2, 3]
+
+
+def test_array_stream_to_columns():
+ c_array = na.c_array_from_buffers(
+ na.struct({"col1": na.int32(), "col2": na.string()}),
+ length=3,
+ buffers=[None],
+ children=[
+ na.c_array([1, 2, 3], na.int32()),
+ na.c_array(["a", "b", "c"], na.string()),
+ ],
+ )
+
+ stream = na.ArrayStream(c_array)
+ names, columns = stream.to_columns()
+ assert names == ["col1", "col2"]
+ assert columns[0] == [1, 2, 3]
+ assert columns[1] == ["a", "b", "c"]
+
+
def test_array_stream_read_all():
stream = na.ArrayStream([1, 2, 3], na.int32())
array = stream.read_all()
assert array.schema.type == na.Type.INT32
- assert list(array.iter_py()) == [1, 2, 3]
+ assert array.to_pylist() == [1, 2, 3]
def test_array_stream_read_next():
stream = na.ArrayStream([1, 2, 3], na.int32())
array = stream.read_next()
assert array.schema.type == na.Type.INT32
- assert list(array.iter_py()) == [1, 2, 3]
+ assert array.to_pylist() == [1, 2, 3]
with pytest.raises(StopIteration):
stream.read_next()
diff --git a/python/tests/test_visitor.py b/python/tests/test_visitor.py
new file mode 100644
index 00000000..39c3c4d0
--- /dev/null
+++ b/python/tests/test_visitor.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+
+import nanoarrow as na
+from nanoarrow import visitor
+
+
+def test_to_pylist():
+ array = na.c_array([1, 2, 3], na.int32())
+ assert visitor.to_pylist(array) == [1, 2, 3]
+
+
+def test_to_columms():
+ array = na.c_array_from_buffers(
+ na.struct({"col1": na.int32(), "col2": na.bool_(), "col3":
na.string()}),
+ length=3,
+ buffers=[None],
+ children=[
+ na.c_array([1, 2, 3], na.int32()),
+ na.c_array([1, 0, 1], na.bool_()),
+ na.c_array(["abc", "def", "ghi"], na.string()),
+ ],
+ )
+
+ names, columns = visitor.to_columns(array)
+ assert names == ["col1", "col2", "col3"]
+ assert columns[0] == [1, 2, 3]
+ assert columns[1] == [True, False, True]
+ assert columns[2] == ["abc", "def", "ghi"]
+
+ with pytest.raises(ValueError, match="can only be used on a struct array"):
+ visitor.to_columns([], na.int32())