danepitkin commented on code in PR #439: URL: https://github.com/apache/arrow-nanoarrow/pull/439#discussion_r1576870991
########## python/src/nanoarrow/array_stream.py: ########## @@ -0,0 +1,279 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from functools import cached_property +from typing import Iterable, Tuple + +from nanoarrow._lib import CMaterializedArrayStream +from nanoarrow._repr_utils import make_class_label +from nanoarrow.array import Array +from nanoarrow.c_lib import c_array_stream +from nanoarrow.iterator import iter_py, iter_tuples +from nanoarrow.schema import Schema + + +class ArrayStream: + """High-level ArrayStream representation + + The ArrayStream is nanoarrow's high-level representation of zero + or more contiguous arrays that have not neccessarily been materialized. + This is in constrast to the nanoarrow :class:`Array`, which consists + of zero or more contiguous arrays but is always fully-materialized. + + The :class:`ArrayStream` is similar to pyarrow's ``RecordBatchReader`` + except it can also represent streams of non-struct arrays. Its scope + maps to that of an``ArrowArrayStream`` as represented by the Arrow C + Stream interface. + + Parameters + ---------- + obj : array or array stream-like + An array-like or array stream-like object as sanitized by + :func:`c_array_stream`. + schema : schema-like, optional + An optional schema, passed to :func:`c_array_stream`. + + Examples + -------- + + >>> import nanoarrow as na + >>> na.ArrayStream([1, 2, 3], na.int32()) + <nanoarrow.ArrayStream: Schema(INT32)> + """ + + def __init__(self, obj, schema=None) -> None: + self._c_array_stream = c_array_stream(obj, schema) + + @cached_property + def schema(self): + """The :class:`Schema` associated with this stream + + >>> import nanoarrow as na + >>> stream = na.ArrayStream([1, 2, 3], na.int32()) + >>> stream.schema + Schema(INT32) + """ + return Schema(self._c_array_stream._get_cached_schema()) + + def __arrow_c_stream__(self, requested_schema=None): + return self._c_array_stream.__arrow_c_stream__( + requested_schema=requested_schema + ) + + def read_all(self) -> Array: + """Materialize the entire stream into an :class:`Array` + + >>> import nanoarrow as na + >>> stream = na.ArrayStream([1, 2, 3], na.int32()) + >>> stream.read_all() + nanoarrow.Array<int32>[3] + 1 + 2 + 3 + """ + return Array(self._c_array_stream) + + def read_next(self) -> Array: + """Materialize the next contiguous :class:`Array` in this stream + + This method raises ``StopIteration`` if there are no more arrays + in this stream. + + >>> import nanoarrow as na + >>> stream = na.ArrayStream([1, 2, 3], na.int32()) + >>> stream.read_next() + nanoarrow.Array<int32>[3] + 1 + 2 + 3 + """ + c_array = self._c_array_stream.get_next() + return Array(CMaterializedArrayStream.from_c_array(c_array)) + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.close() + + def close(self) -> None: + """Release resources associated with this stream + + Note that it is usually preferred to use the context manager to ensure + prompt release of resources (e.g., open files) associated with + this stream. + + Examples + -------- + >>> import nanoarrow as na + >>> stream = na.ArrayStream([1, 2, 3], na.int32()) + >>> with stream: + ... pass + >>> stream.read_all() + Traceback (most recent call last): + ... + RuntimeError: array stream is released + + >>> stream = na.ArrayStream([1, 2, 3], na.int32()) + >>> stream.close() + >>> stream.read_all() + Traceback (most recent call last): + ... + RuntimeError: array stream is released + """ + self._c_array_stream.release() + + def __iter__(self) -> Iterable[Array]: + for c_array in self._c_array_stream: + yield Array(CMaterializedArrayStream.from_c_array(c_array)) + + def iter_chunks(self) -> Iterable[Array]: + """Iterate over contiguous Arrays in this stream + + For the :class:`ArrayStream`, this is the same as iterating over + the stream itself. + + Examples + -------- + + >>> import nanoarrow as na + >>> stream = na.ArrayStream([1, 2, 3], na.int32()) + >>> for chunk in stream: + ... print(chunk) + nanoarrow.Array<int32>[3] + 1 + 2 + 3 + """ + return iter(self) + + def iter_py(self) -> Iterable: + """Iterate over the default Python representation of each element. + + Examples + -------- + + >>> import nanoarrow as na + >>> stream = na.ArrayStream([1, 2, 3], na.int32()) + >>> for item in stream.iter_py(): + ... print(item) + 1 + 2 + 3 + """ + return iter_py(self) + + def iter_tuples(self) -> Iterable[Tuple]: + """Iterate over rows of a struct stream as tuples + + Examples + -------- + + >>> import nanoarrow as na + >>> import pyarrow as pa + >>> batch = pa.record_batch( + ... [pa.array([1, 2, 3]), pa.array(["a", "b", "c"])], + ... names=["col1", "col2"] + ... ) + >>> stream = na.ArrayStream(batch) + >>> for item in stream.iter_tuples(): + ... print(item) + (1, 'a') + (2, 'b') + (3, 'c') + """ + return iter_tuples(self) + + def __repr__(self) -> str: + cls = make_class_label(self, "nanoarrow") + return f"<{cls}: {self.schema}>" + + @staticmethod + def from_readable(obj): + """Create an ArrayStream from an IPC stream in a readable file or buffer + + Examples + -------- + >>> import nanoarrow as na + >>> from nanoarrow.ipc import Stream + >>> with na.ArrayStream.from_readable(Stream.example_bytes()) as stream: + ... stream.read_all() + nanoarrow.Array<struct<some_col: int32>>[3] + {'some_col': 1} + {'some_col': 2} + {'some_col': 3} + """ + from nanoarrow.ipc import Stream + + with Stream.from_readable(obj) as ipc_stream: + return ArrayStream(ipc_stream) + + @staticmethod + def from_path(obj, *args, **kwargs): + """Create an ArrayStream from an IPC stream at a local file path + + Examples + -------- + >>> import tempfile + >>> import os + >>> import nanoarrow as na + >>> from nanoarrow.ipc import Stream + >>> with tempfile.TemporaryDirectory() as td: + ... path = os.path.join(td, "test.arrows") + ... with open(path, "wb") as f: + ... nbytes = f.write(Stream.example_bytes()) + ... + ... with na.ArrayStream.from_path(path) as stream: + ... stream.read_all() + nanoarrow.Array<struct<some_col: int32>>[3] + {'some_col': 1} + {'some_col': 2} + {'some_col': 3} + """ + from nanoarrow.ipc import Stream + + with Stream.from_path(obj, *args, **kwargs) as ipc_stream: + return ArrayStream(ipc_stream) + + @staticmethod + def from_url(obj, *args, **kwargs): + """Create an ArrayStream from an IPC stream at a local file path Review Comment: ```suggestion """Create an ArrayStream from an IPC stream at a URL ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
