This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 3aa0ec12 feat(python): Implement bindings to IPC writer (#586)
3aa0ec12 is described below
commit 3aa0ec122ab59138984a74c42f7d17c70ea8ece1
Author: Dewey Dunnington <[email protected]>
AuthorDate: Fri Sep 13 23:12:48 2024 -0500
feat(python): Implement bindings to IPC writer (#586)
This PR implements bindings to the IPC writer in the nanoarrow C
library. This adds:
- An `ipc.StreamWriter()` class roughly mirroring pyarrow's
`ipc.Stream()`
- `Schema.serialize()` and `Array.serialize()` to match pyarrow's
`serialize()` methods.
```python
import io
import nanoarrow as na
from nanoarrow.ipc import StreamWriter, InputStream
out = io.BytesIO()
writer = StreamWriter.from_writable(out)
writer.write_stream(InputStream.example())
na.Array(InputStream.from_readable(out.getvalue()))
#> nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
#> {'some_col': 1}
#> {'some_col': 2}
#> {'some_col': 3}
```
---------
Co-authored-by: Joris Van den Bossche <[email protected]>
---
dev/benchmarks/python/ipc.py | 4 +-
python/src/nanoarrow/_ipc_lib.pyx | 195 ++++++++++++++++++++--
python/src/nanoarrow/array.py | 25 ++-
python/src/nanoarrow/array_stream.py | 24 +--
python/src/nanoarrow/ipc.py | 305 +++++++++++++++++++++++++++++++----
python/src/nanoarrow/schema.py | 27 ++++
python/tests/test_array.py | 26 +++
python/tests/test_array_stream.py | 8 +-
python/tests/test_ipc.py | 145 +++++++++++++++--
python/tests/test_schema.py | 15 ++
10 files changed, 699 insertions(+), 75 deletions(-)
diff --git a/dev/benchmarks/python/ipc.py b/dev/benchmarks/python/ipc.py
index b841a1d6..f717eb8f 100644
--- a/dev/benchmarks/python/ipc.py
+++ b/dev/benchmarks/python/ipc.py
@@ -43,12 +43,12 @@ class IpcReaderSuite:
return os.path.join(self.fixtures_dir, name)
def read_fixture_file(self, name):
- with ipc.Stream.from_path(self.fixture_path(name)) as in_stream:
+ with ipc.InputStream.from_path(self.fixture_path(name)) as in_stream:
list(na.c_array_stream(in_stream))
def read_fixture_buffer(self, name):
f = io.BytesIO(self.fixture_buffer[name])
- with ipc.Stream.from_readable(f) as in_stream:
+ with ipc.InputStream.from_readable(f) as in_stream:
list(na.c_array_stream(in_stream))
def time_read_float64_basic_file(self):
diff --git a/python/src/nanoarrow/_ipc_lib.pyx
b/python/src/nanoarrow/_ipc_lib.pyx
index 2db8e003..fcb6530a 100644
--- a/python/src/nanoarrow/_ipc_lib.pyx
+++ b/python/src/nanoarrow/_ipc_lib.pyx
@@ -19,18 +19,24 @@
# cython: linetrace=True
from libc.stdint cimport uint8_t, int64_t, uintptr_t
-from libc.errno cimport EIO
+from libc.errno cimport EIO, EAGAIN
from libc.stdio cimport snprintf
from cpython.ref cimport PyObject, Py_INCREF, Py_DECREF
from cpython cimport Py_buffer, PyBuffer_FillInfo
from nanoarrow_c cimport (
+ ArrowArrayStream,
+ ArrowArrayView,
+ ArrowSchema,
ArrowErrorCode,
ArrowError,
NANOARROW_OK,
- ArrowArrayStream,
)
+from nanoarrow._schema cimport CSchema
+from nanoarrow._array cimport CArrayView
+from nanoarrow._utils cimport Error
+
cdef extern from "nanoarrow_ipc.h" nogil:
struct ArrowIpcInputStream:
@@ -48,18 +54,43 @@ cdef extern from "nanoarrow_ipc.h" nogil:
ArrowArrayStream* out, ArrowIpcInputStream* input_stream,
ArrowIpcArrayStreamReaderOptions* options)
+ struct ArrowIpcOutputStream:
+ ArrowErrorCode (*write)(ArrowIpcOutputStream* stream, const void* buf,
+ int64_t buf_size_bytes, int64_t*
size_written_out,
+ ArrowError* error)
+ void (*release)(ArrowIpcOutputStream* stream)
+ void* private_data
-cdef class PyInputStreamPrivate:
+ struct ArrowIpcWriter:
+ void* private_data
+
+ ArrowErrorCode ArrowIpcWriterInit(ArrowIpcWriter* writer,
+ ArrowIpcOutputStream* output_stream)
+ void ArrowIpcWriterReset(ArrowIpcWriter* writer)
+ ArrowErrorCode ArrowIpcWriterWriteSchema(ArrowIpcWriter* writer,
+ const ArrowSchema* in_,
+ ArrowError* error)
+ ArrowErrorCode ArrowIpcWriterWriteArrayView(ArrowIpcWriter* writer,
+ const ArrowArrayView* in_,
+ ArrowError* error)
+
+ ArrowErrorCode ArrowIpcWriterWriteArrayStream(ArrowIpcWriter* writer,
+ ArrowArrayStream* in_,
+ ArrowError* error)
+
+cdef class PyStreamPrivate:
cdef object _obj
cdef bint _close_obj
cdef void* _addr
cdef Py_ssize_t _size_bytes
+ cdef bint _buffer_readonly
- def __cinit__(self, obj, close_obj=False):
+ def __cinit__(self, obj, bint buffer_readonly, bint close_obj=False):
self._obj = obj
self._close_obj = close_obj
self._addr = NULL
self._size_bytes = 0
+ self._buffer_readonly = buffer_readonly
@property
def obj(self):
@@ -78,14 +109,16 @@ cdef class PyInputStreamPrivate:
return self._size_bytes
# Implement the buffer protocol so that this object can be used as
- # the argument to xxx.readinto(). This ensures that no extra copies
- # (beyond any buffering done by the upstream file-like object) are held
- # since the upstream object has access to the preallocated output buffer.
- # In this case, the preallocation is done by the ArrowArrayStream
+ # the argument to xxx.readinto() or xxx.write(). This ensures that
+ # no extra copies (beyond any buffering done by the upstream file-like
object)
+ # are held since the upstream object has access to the preallocated output
buffer.
+ # In the read case, the preallocation is done by the ArrowArrayStream
# implementation before issuing each read call (two per message, with
# an extra call for a RecordBatch message to get the actual buffer data).
+ # In the write case, this will be a view of whatever information was
provided to
+ # the write callback.
def __getbuffer__(self, Py_buffer* buffer, int flags):
- PyBuffer_FillInfo(buffer, self, self._addr, self._size_bytes, 0, flags)
+ PyBuffer_FillInfo(buffer, self, self._addr, self._size_bytes,
self._buffer_readonly, flags)
def __releasebuffer__(self, Py_buffer* buffer):
pass
@@ -100,8 +133,16 @@ cdef ArrowErrorCode
py_input_stream_read(ArrowIpcInputStream* stream, uint8_t* b
stream_private.set_buffer(<uintptr_t>buf, buf_size_bytes)
try:
- size_read_out[0] = stream_private.obj.readinto(stream_private)
- return NANOARROW_OK
+ # Non-blocking streams may return None here, or buffered
+ # wrappers of them may raise BufferedIOError
+ read_result = stream_private.obj.readinto(stream_private)
+
+ if read_result is None:
+ size_read_out[0] = 0
+ return EAGAIN
+ else:
+ size_read_out[0] = read_result
+ return NANOARROW_OK
except Exception as e:
cls = type(e).__name__.encode()
msg = str(e).encode()
@@ -126,6 +167,51 @@ cdef void py_input_stream_release(ArrowIpcInputStream*
stream) noexcept nogil:
stream.release = NULL
+
+cdef ArrowErrorCode py_output_stream_write(ArrowIpcOutputStream* stream, const
void* buf,
+ int64_t buf_size_bytes, int64_t*
size_written_out,
+ ArrowError* error) noexcept nogil:
+
+ with gil:
+ stream_private = <object>stream.private_data
+ stream_private.set_buffer(<uintptr_t>buf, buf_size_bytes)
+
+ try:
+ # Non-blocking streams may return None here, or buffered
+ # wrappers of them may raise BufferedIOError
+ write_result = stream_private.obj.write(stream_private)
+
+ # Non-blocking streams may return None here
+ if write_result is None:
+ size_written_out[0] = 0
+ return EAGAIN
+ else:
+ size_written_out[0] = write_result
+ return NANOARROW_OK
+ except Exception as e:
+ cls = type(e).__name__.encode()
+ msg = str(e).encode()
+ snprintf(
+ error.message,
+ sizeof(error.message),
+ "%s: %s",
+ <const char*>cls,
+ <const char*>msg
+ )
+ return EIO
+
+cdef void py_output_stream_release(ArrowIpcOutputStream* stream) noexcept
nogil:
+ with gil:
+ stream_private = <object>stream.private_data
+ if stream_private.close_obj:
+ stream_private.obj.close()
+
+ Py_DECREF(stream_private)
+
+ stream.private_data = NULL
+ stream.release = NULL
+
+
cdef class CIpcInputStream:
cdef ArrowIpcInputStream _stream
@@ -150,7 +236,11 @@ cdef class CIpcInputStream:
@staticmethod
def from_readable(obj, close_obj=False):
cdef CIpcInputStream stream = CIpcInputStream()
- cdef PyInputStreamPrivate private_data = PyInputStreamPrivate(obj,
close_obj)
+ cdef PyStreamPrivate private_data = PyStreamPrivate(
+ obj,
+ buffer_readonly=False,
+ close_obj=close_obj
+ )
stream._stream.private_data = <PyObject*>private_data
Py_INCREF(private_data)
@@ -166,3 +256,84 @@ def init_array_stream(CIpcInputStream input_stream,
uintptr_t out):
cdef int code = ArrowIpcArrayStreamReaderInit(out_ptr,
&input_stream._stream, NULL)
if code != NANOARROW_OK:
raise RuntimeError(f"ArrowIpcArrayStreamReaderInit() failed with code
[{code}]")
+
+
+cdef class CIpcOutputStream:
+ cdef ArrowIpcOutputStream _stream
+
+ def __cinit__(self):
+ self._stream.release = NULL
+
+ def is_valid(self):
+ return self._stream.release != NULL
+
+ def __dealloc__(self):
+ # Duplicating release() to avoid Python API calls in the deallocator
+ if self._stream.release != NULL:
+ self._stream.release(&self._stream)
+
+ def release(self):
+ if self._stream.release != NULL:
+ self._stream.release(&self._stream)
+ return True
+ else:
+ return False
+
+ @staticmethod
+ def from_writable(obj, close_obj=False):
+ cdef CIpcOutputStream stream = CIpcOutputStream()
+ cdef PyStreamPrivate private_data = PyStreamPrivate(
+ obj,
+ buffer_readonly=True,
+ close_obj=close_obj
+ )
+
+ stream._stream.private_data = <PyObject*>private_data
+ Py_INCREF(private_data)
+ stream._stream.write = &py_output_stream_write
+ stream._stream.release = &py_output_stream_release
+ return stream
+
+
+cdef class CIpcWriter:
+ cdef ArrowIpcWriter _writer
+
+ def __cinit__(self, CIpcOutputStream stream):
+ self._writer.private_data = NULL
+ if not stream.is_valid():
+ raise ValueError("Can't create writer from released stream")
+
+ cdef int code = ArrowIpcWriterInit(&self._writer, &stream._stream)
+ Error.raise_error_not_ok("ArrowIpcWriterInit()", code)
+
+ def is_valid(self):
+ return self._writer.private_data != NULL
+
+ def __dealloc__(self):
+ if self._writer.private_data != NULL:
+ ArrowIpcWriterReset(&self._writer)
+
+ def release(self):
+ if self._writer.private_data != NULL:
+ ArrowIpcWriterReset(&self._writer)
+
+ def write_schema(self, CSchema schema):
+ cdef Error error = Error()
+ cdef int code = ArrowIpcWriterWriteSchema(&self._writer, schema._ptr,
&error.c_error)
+ error.raise_message_not_ok("ArrowIpcWriterWriteSchema()", code)
+
+ def write_array_view(self, CArrayView array_view):
+ cdef Error error = Error()
+ cdef int code = ArrowIpcWriterWriteArrayView(&self._writer,
array_view._ptr, &error.c_error)
+ error.raise_message_not_ok("ArrowIpcWriterWriteArrayView()", code)
+
+ def write_array_stream(self, uintptr_t stream_addr):
+ cdef ArrowArrayStream* array_stream = <ArrowArrayStream*>stream_addr
+ cdef Error error = Error()
+ cdef int code = ArrowIpcWriterWriteArrayStream(&self._writer,
array_stream, &error.c_error)
+ error.raise_message_not_ok("ArrowIpcWriterWriteArrayStream()", code)
+
+ def write_end_of_stream(self):
+ cdef Error error = Error()
+ cdef int code = ArrowIpcWriterWriteArrayView(&self._writer, NULL,
&error.c_error)
+ error.raise_message_not_ok("ArrowIpcWriterWriteArrayView()", code)
diff --git a/python/src/nanoarrow/array.py b/python/src/nanoarrow/array.py
index 4a7210fb..35ca7c68 100644
--- a/python/src/nanoarrow/array.py
+++ b/python/src/nanoarrow/array.py
@@ -17,7 +17,7 @@
import itertools
from functools import cached_property
-from typing import Iterable, Tuple
+from typing import Iterable, Tuple, Union
from nanoarrow._array import CArray, CArrayView
from nanoarrow._array_stream import CMaterializedArrayStream
@@ -542,6 +542,29 @@ class Array(ArrayViewVisitable):
"to iterate over elements of this Array"
)
+ def serialize(self, dst=None) -> Union[bytes, None]:
+ """Write this Array into dst as zero or more encapsulated IPC messages
+
+ Parameters
+ ----------
+ dst : file-like, optional
+ If present, a file-like object into which the chunks of this array
+ should be serialized. If omitted, this will create a
``io.BytesIO()``
+ and return the serialized result.
+ """
+ from nanoarrow.ipc import StreamWriter
+
+ if dst is None:
+ import io
+
+ with io.BytesIO() as dst:
+ writer = StreamWriter.from_writable(dst)
+ writer.write_stream(self, write_schema=False)
+ return dst.getvalue()
+ else:
+ writer = StreamWriter.from_writable(dst)
+ writer.write_stream(self, write_schema=False)
+
def to_string(self, width_hint=80, items_hint=10) -> str:
cls_name = _repr_utils.make_class_label(self, module="nanoarrow")
len_text = f"[{len(self)}]"
diff --git a/python/src/nanoarrow/array_stream.py
b/python/src/nanoarrow/array_stream.py
index e7282721..d169b966 100644
--- a/python/src/nanoarrow/array_stream.py
+++ b/python/src/nanoarrow/array_stream.py
@@ -211,17 +211,17 @@ class ArrayStream(ArrayViewVisitable):
Examples
--------
>>> import nanoarrow as na
- >>> from nanoarrow.ipc import Stream
- >>> with na.ArrayStream.from_readable(Stream.example_bytes()) as
stream:
+ >>> from nanoarrow.ipc import InputStream
+ >>> with na.ArrayStream.from_readable(InputStream.example_bytes()) as
stream:
... stream.read_all()
nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
{'some_col': 1}
{'some_col': 2}
{'some_col': 3}
"""
- from nanoarrow.ipc import Stream
+ from nanoarrow.ipc import InputStream
- with Stream.from_readable(obj) as ipc_stream:
+ with InputStream.from_readable(obj) as ipc_stream:
return ArrayStream(ipc_stream)
@staticmethod
@@ -233,11 +233,11 @@ class ArrayStream(ArrayViewVisitable):
>>> import tempfile
>>> import os
>>> import nanoarrow as na
- >>> from nanoarrow.ipc import Stream
+ >>> from nanoarrow.ipc import InputStream
>>> with tempfile.TemporaryDirectory() as td:
... path = os.path.join(td, "test.arrows")
... with open(path, "wb") as f:
- ... nbytes = f.write(Stream.example_bytes())
+ ... nbytes = f.write(InputStream.example_bytes())
...
... with na.ArrayStream.from_path(path) as stream:
... stream.read_all()
@@ -246,9 +246,9 @@ class ArrayStream(ArrayViewVisitable):
{'some_col': 2}
{'some_col': 3}
"""
- from nanoarrow.ipc import Stream
+ from nanoarrow.ipc import InputStream
- with Stream.from_path(obj, *args, **kwargs) as ipc_stream:
+ with InputStream.from_path(obj, *args, **kwargs) as ipc_stream:
return ArrayStream(ipc_stream)
@staticmethod
@@ -261,11 +261,11 @@ class ArrayStream(ArrayViewVisitable):
>>> import tempfile
>>> import os
>>> import nanoarrow as na
- >>> from nanoarrow.ipc import Stream
+ >>> from nanoarrow.ipc import InputStream
>>> with tempfile.TemporaryDirectory() as td:
... path = os.path.join(td, "test.arrows")
... with open(path, "wb") as f:
- ... nbytes = f.write(Stream.example_bytes())
+ ... nbytes = f.write(InputStream.example_bytes())
...
... uri = pathlib.Path(path).as_uri()
... with na.ArrayStream.from_url(uri) as stream:
@@ -275,7 +275,7 @@ class ArrayStream(ArrayViewVisitable):
{'some_col': 2}
{'some_col': 3}
"""
- from nanoarrow.ipc import Stream
+ from nanoarrow.ipc import InputStream
- with Stream.from_url(obj, *args, **kwargs) as ipc_stream:
+ with InputStream.from_url(obj, *args, **kwargs) as ipc_stream:
return ArrayStream(ipc_stream)
diff --git a/python/src/nanoarrow/ipc.py b/python/src/nanoarrow/ipc.py
index 91916904..ef02a420 100644
--- a/python/src/nanoarrow/ipc.py
+++ b/python/src/nanoarrow/ipc.py
@@ -18,13 +18,21 @@
import io
from nanoarrow._array_stream import CArrayStream
-from nanoarrow._ipc_lib import CIpcInputStream, init_array_stream
+from nanoarrow._ipc_lib import (
+ CIpcInputStream,
+ CIpcOutputStream,
+ CIpcWriter,
+ init_array_stream,
+)
from nanoarrow._utils import obj_is_buffer
+from nanoarrow.array import c_array
+from nanoarrow.array_stream import c_array_stream
+from nanoarrow.iterator import ArrayViewBaseIterator
from nanoarrow import _repr_utils
-class Stream:
+class InputStream:
"""Stream of serialized Arrow data
Reads file paths or otherwise readable file objects that contain
@@ -41,8 +49,8 @@ class Stream:
--------
>>> import nanoarrow as na
- >>> from nanoarrow.ipc import Stream
- >>> with Stream.example() as inp, na.c_array_stream(inp) as stream:
+ >>> from nanoarrow.ipc import InputStream
+ >>> with InputStream.example() as inp, na.c_array_stream(inp) as stream:
... stream
<nanoarrow.c_array_stream.CArrayStream>
- get_schema(): struct<some_col: int32>
@@ -62,7 +70,7 @@ class Stream:
input stream to an ArrowArrayStream wrapped by a PyCapsule.
"""
if not self._is_valid():
- raise RuntimeError("nanoarrow.ipc.Stream is no longer valid")
+ raise RuntimeError("nanoarrow.ipc.InputStream is no longer valid")
with CArrayStream.allocate() as array_stream:
init_array_stream(self._stream, array_stream._addr())
@@ -81,7 +89,7 @@ class Stream:
"""Wrap an open readable file or buffer as an Arrow IPC stream
Wraps a readable object (specificially, an object that implements a
- ``readinto()`` method) as a non-owning Stream. Closing ``obj`` remains
+ ``readinto()`` method) as a non-owning InputStream. Closing ``obj``
remains
the caller's responsibility: neither this stream nor the resulting
array
stream will call ``obj.close()``.
@@ -94,8 +102,8 @@ class Stream:
--------
>>> import nanoarrow as na
- >>> from nanoarrow.ipc import Stream
- >>> ipc_stream = Stream.from_readable(Stream.example_bytes())
+ >>> from nanoarrow.ipc import InputStream
+ >>> ipc_stream = InputStream.from_readable(InputStream.example_bytes())
>>> na.c_array_stream(ipc_stream)
<nanoarrow.c_array_stream.CArrayStream>
- get_schema(): struct<some_col: int32>
@@ -106,7 +114,7 @@ class Stream:
else:
close_obj = False
- out = Stream()
+ out = InputStream()
out._stream = CIpcInputStream.from_readable(obj, close_obj=close_obj)
out._desc = repr(obj)
return out
@@ -116,8 +124,8 @@ class Stream:
"""Wrap a local file as an IPC stream
Wraps a pathlike object (specificially, one that can be passed to
``open()``)
- as an owning Stream. The file will be opened in binary mode and will
be closed
- when this stream or the resulting array stream is released.
+ as an owning InputStream. The file will be opened in binary mode and
will be
+ closed when this stream or the resulting array stream is released.
Parameters
----------
@@ -130,18 +138,18 @@ class Stream:
>>> import tempfile
>>> import os
>>> import nanoarrow as na
- >>> from nanoarrow.ipc import Stream
+ >>> from nanoarrow.ipc import InputStream
>>> with tempfile.TemporaryDirectory() as td:
... path = os.path.join(td, "test.arrows")
... with open(path, "wb") as f:
- ... nbytes = f.write(Stream.example_bytes())
+ ... nbytes = f.write(InputStream.example_bytes())
...
- ... with Stream.from_path(path) as inp, na.c_array_stream(inp) as
stream:
- ... stream
+ ... with InputStream.from_path(path) as inp,
na.c_array_stream(inp) as s:
+ ... s
<nanoarrow.c_array_stream.CArrayStream>
- get_schema(): struct<some_col: int32>
"""
- out = Stream()
+ out = InputStream()
out._stream = CIpcInputStream.from_readable(
open(obj, "rb", *args, **kwargs), close_obj=True
)
@@ -153,7 +161,7 @@ class Stream:
"""Wrap a URL as an IPC stream
Wraps a URL (specificially, one that can be passed to
- ``urllib.request.urlopen()``) as an owning Stream. The URL will be
+ ``urllib.request.urlopen()``) as an owning InputStream. The URL will be
closed when this stream or the resulting array stream is released.
Parameters
@@ -168,21 +176,21 @@ class Stream:
>>> import tempfile
>>> import os
>>> import nanoarrow as na
- >>> from nanoarrow.ipc import Stream
+ >>> from nanoarrow.ipc import InputStream
>>> with tempfile.TemporaryDirectory() as td:
... path = os.path.join(td, "test.arrows")
... with open(path, "wb") as f:
- ... nbytes = f.write(Stream.example_bytes())
+ ... nbytes = f.write(InputStream.example_bytes())
...
... uri = pathlib.Path(path).as_uri()
- ... with Stream.from_url(uri) as inp, na.c_array_stream(inp) as
stream:
+ ... with InputStream.from_url(uri) as inp, na.c_array_stream(inp)
as stream:
... stream
<nanoarrow.c_array_stream.CArrayStream>
- get_schema(): struct<some_col: int32>
"""
import urllib.request
- out = Stream()
+ out = InputStream()
out._stream = CIpcInputStream.from_readable(
urllib.request.urlopen(obj, *args, **kwargs), close_obj=True
)
@@ -191,7 +199,7 @@ class Stream:
@staticmethod
def example():
- """Example IPC Stream
+ """Example IPC InputStream
A self-contained example whose value is the serialized version of
``DataFrame({"some_col": [1, 2, 3]})``. This may be used for testing
@@ -201,17 +209,17 @@ class Stream:
Examples
--------
- >>> from nanoarrow.ipc import Stream
- >>> Stream.example()
- <nanoarrow.ipc.Stream <_io.BytesIO object at ...>>
+ >>> from nanoarrow.ipc import InputStream
+ >>> InputStream.example()
+ <nanoarrow.ipc.InputStream <_io.BytesIO object at ...>>
"""
- return Stream.from_readable(Stream.example_bytes())
+ return InputStream.from_readable(InputStream.example_bytes())
@staticmethod
def example_bytes():
"""Example stream bytes
- The underlying bytes of the :staticmethod:`example` Stream. This is
useful
+ The underlying bytes of the :staticmethod:`example` InputStream. This
is useful
for writing files or creating other types of test input.
Examples
@@ -219,11 +227,11 @@ class Stream:
>>> import os
>>> import tempfile
- >>> from nanoarrow.ipc import Stream
+ >>> from nanoarrow.ipc import InputStream
>>> with tempfile.TemporaryDirectory() as td:
... path = os.path.join(td, "test.arrows")
... with open(path, "wb") as f:
- ... f.write(Stream.example_bytes())
+ ... f.write(InputStream.example_bytes())
440
"""
return _EXAMPLE_IPC_SCHEMA + _EXAMPLE_IPC_BATCH
@@ -236,9 +244,246 @@ class Stream:
return f"<{class_label} <invalid>>"
+class StreamWriter:
+ """Write streams of serialized Arrow data
+
+ Provides various ways of writing Arrow schemas and record batches as
+ binary data serialized using the Arrow IPC streaming format.
+
+ Use :staticmethod:`from_writeable` or :staticmethod:`from_path`, or
+ to construct a writer.
+
+ Examples
+ --------
+
+ >>> import io
+ >>> import nanoarrow as na
+ >>> from nanoarrow.ipc import StreamWriter
+ >>>
+ >>> out = io.BytesIO()
+ >>> array = na.c_array_from_buffers(
+ ... na.struct({"some_col": na.int32()}),
+ ... length=3,
+ ... buffers=[],
+ ... children=[na.c_array([1, 2, 3], na.int32())]
+ ... )
+ >>>
+ >>> with StreamWriter.from_writable(out) as writer:
+ ... writer.write_stream(array)
+ >>>
+ >>> na.ArrayStream.from_readable(out.getvalue()).read_all()
+ nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
+ {'some_col': 1}
+ {'some_col': 2}
+ {'some_col': 3}
+ """
+
+ def __init__(self):
+ self._writer = None
+ self._desc = None
+ self._iterator = None
+
+ def _is_valid(self) -> bool:
+ return self._writer is not None and self._writer.is_valid()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ self.close()
+
+ def release(self):
+ """Close stream without writing the end-of-stream marker"""
+ if not self._is_valid():
+ return
+
+ self._writer.release()
+ self._writer = None
+
+ def close(self):
+ """Close stream and write end-of-stream marker"""
+ if not self._is_valid():
+ return
+
+ self._writer.write_end_of_stream()
+ self.release()
+
+ def write_array(self, obj, schema=None, *, write_schema=None):
+ """Interpret obj as an array and write to stream
+
+ Parameters
+ ----------
+ obj : array-like
+ An array-like object as sanitized by :func:`c_array`.
+ schema : schema-like, optional
+ An optional schema, passed to :func:`c_array`.
+ write_schema : bool, optional
+ See :meth:`write_stream`.
+ """
+ obj = c_array(obj)
+ return self.write_stream(obj, schema, write_schema=write_schema)
+
+ def write_stream(self, obj, schema=None, *, write_schema=None):
+ """Interpret obj as a stream of arrays and write to stream
+
+ Writes all arrays from obj to the output stream.
+
+ Parameters
+ ----------
+ obj : array stream-like
+ An array-like or array stream-like object as sanitized by
+ :func:`c_array_stream`.
+ schema : schema-like, optional
+ An optional schema, passed to :func:`c_array_stream`.
+ write_schema : bool, optional
+ If True, the schema will always be written to the output stream;
if False,
+ the schema will never be written to the output stream. If omitted,
the
+ schema will be written if nothing has yet been written to the
output.
+ """
+ if not self._is_valid():
+ raise ValueError("Can't write to released StreamWriter")
+
+ with c_array_stream(obj, schema=schema) as stream:
+ if self._iterator is None:
+ self._iterator =
ArrayViewBaseIterator(stream._get_cached_schema())
+ if write_schema is None:
+ write_schema = True
+
+ if write_schema:
+ self._writer.write_schema(self._iterator._schema)
+
+ for array in stream:
+ self._iterator._set_array(array)
+ self._writer.write_array_view(self._iterator._array_view)
+
+ def serialize_stream(self, obj, schema=None):
+ """Interpret obj as a stream of arrays, write to stream, and close
+
+ Like :meth:`write_stream` except always writes a schema message and
+ always appends the end-of-stream marker to the output. This method
+ also takes a potentially more efficient path that uses fewer Python
+ calls at the expense of less flexibility. After calling this method,
+ the writer is released and subequent calls to methods will error.
+
+ Parameters
+ ----------
+ obj : array stream-like
+ An array-like or array stream-like object as sanitized by
+ :func:`c_array_stream`.
+ schema : schema-like, optional
+ An optional schema, passed to :func:`c_array_stream`.
+ """
+ if not self._is_valid():
+ raise ValueError("Can't write to released StreamWriter")
+
+ # If we've already written a schema or we've explicitly been asked
+ # to write one, we can't write using the stream writer because it
+ # automatically appends a schema. We can, however, write the rest
+ # of the stream and EOS using write().
+ if self._iterator is not None:
+ raise ValueError("Can't serialize_stream() into a non-empty
writer")
+
+ # Write the entire stream and release the writer. We can't
+ # use close() because that would write the EOS and the stream
+ # writer has already appended this.
+ with self, c_array_stream(obj, schema=schema) as stream:
+ self._writer.write_array_stream(stream._addr())
+ self.release()
+
+ @staticmethod
+ def from_writable(obj):
+ """Write an Arrow IPC stream to a writable file
+
+ Wraps a writable object (specificially, an object that implements a
+ ``write()`` method) as a non-owning StreamWriter. Closing ``obj``
remains
+ the caller's responsibility (i.e., closing this object will not call
+ ``obj.close()``.
+
+ Parameters
+ ----------
+ obj : A writable file-like object supporting ``write()``.
+
+ Examples
+ --------
+
+ >>> import io
+ >>> import nanoarrow as na
+ >>> from nanoarrow.ipc import StreamWriter
+ >>>
+ >>> out = io.BytesIO()
+ >>> array = na.c_array_from_buffers(
+ ... na.struct({"some_col": na.int32()}),
+ ... length=3,
+ ... buffers=[],
+ ... children=[na.c_array([1, 2, 3], na.int32())]
+ ... )
+ >>>
+ >>> with StreamWriter.from_writable(out) as writer:
+ ... writer.write_stream(array)
+ >>>
+ >>> na.ArrayStream.from_readable(out.getvalue()).read_all()
+ nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
+ {'some_col': 1}
+ {'some_col': 2}
+ {'some_col': 3}
+ """
+ out = StreamWriter()
+ stream = CIpcOutputStream.from_writable(obj, close_obj=False)
+ out._desc = repr(obj)
+
+ out._writer = CIpcWriter(stream)
+ return out
+
+ @staticmethod
+ def from_path(obj, *args, **kwargs):
+ """Wrap a local file as an IPC stream
+
+ Wraps a pathlike object (specificially, one that can be passed to
``open()``)
+ as an owning StreamWriter. The file will be opened in (writable)
binary mode and
+ will be closed when the returned writer is closed.
+
+ Parameters
+ ----------
+ obj : path-like
+ A string or path-like object that can be passed to ``open()``
+
+ Examples
+ --------
+ >>> import os
+ >>> import tempfile
+ >>> import nanoarrow as na
+ >>> from nanoarrow.ipc import StreamWriter
+ >>>
+ >>> array = na.c_array_from_buffers(
+ ... na.struct({"some_col": na.int32()}),
+ ... length=3,
+ ... buffers=[],
+ ... children=[na.c_array([1, 2, 3], na.int32())]
+ ... )
+ >>>
+ >>> with tempfile.TemporaryDirectory() as td:
+ ... path = os.path.join(td, "test.arrows")
+ ... with StreamWriter.from_path(path) as writer:
+ ... writer.write_stream(array)
+ ...
+ ... with na.ArrayStream.from_path(path) as stream:
+ ... stream.read_all()
+ nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
+ {'some_col': 1}
+ {'some_col': 2}
+ {'some_col': 3}
+ """
+ out = StreamWriter()
+ stream = CIpcOutputStream.from_writable(
+ open(obj, "wb", *args, **kwargs), close_obj=True
+ )
+ out._writer = CIpcWriter(stream)
+ return out
+
+
# A self-contained example whose value is the serialized verison of
# DataFrame({"some_col": [1, 2, 3]}). Used to make the tests and documentation
-# self-contained since we don't have an IPC writer.
+# self-contained.
_EXAMPLE_IPC_SCHEMA = (
b"\xff\xff\xff\xff\x10\x01\x00\x00\x10\x00\x00\x00\x00\x00\x0a\x00\x0e\x00\x06"
b"\x00\x05\x00\x08\x00\x0a\x00\x00\x00\x00\x01\x04\x00\x10\x00\x00\x00\x00\x00"
diff --git a/python/src/nanoarrow/schema.py b/python/src/nanoarrow/schema.py
index 7d886067..8adae418 100644
--- a/python/src/nanoarrow/schema.py
+++ b/python/src/nanoarrow/schema.py
@@ -470,6 +470,33 @@ class Schema:
"""
return [self.field(i) for i in range(self.n_fields)]
+ def serialize(self, dst=None) -> Union[bytes, None]:
+ """Write this Schema into dst as an encapsulated IPC message
+
+ Parameters
+ ----------
+ dst : file-like, optional
+ If present, a file-like object into which the schema should be
+ serialized. If omitted, this will create a ``io.BytesIO()`` and
+ return the serialized result.
+ """
+ from nanoarrow.c_array_stream import CArrayStream
+
+ from nanoarrow.ipc import StreamWriter
+
+ empty = CArrayStream.from_c_arrays([], self._c_schema, validate=False)
+
+ if dst is None:
+ import io
+
+ with io.BytesIO() as dst:
+ writer = StreamWriter.from_writable(dst)
+ writer.write_stream(empty)
+ return dst.getvalue()
+ else:
+ writer = StreamWriter.from_writable(dst)
+ writer.write_stream(empty)
+
def __repr__(self) -> str:
return _schema_repr(self)
diff --git a/python/tests/test_array.py b/python/tests/test_array.py
index 2dfd83c8..4baae94e 100644
--- a/python/tests/test_array.py
+++ b/python/tests/test_array.py
@@ -357,6 +357,32 @@ def test_array_inspect(capsys):
assert captured.out.startswith("<ArrowArray struct<col0: int32")
+def test_array_serialize():
+ import io
+
+ c_array = na.c_array_from_buffers(
+ na.struct({"some_col": na.int32()}, nullable=False),
+ length=3,
+ buffers=[],
+ children=[na.c_array([1, 2, 3], na.int32())],
+ )
+ array = na.Array(c_array)
+ schema_serialized = array.schema.serialize()
+
+ serialized = array.serialize()
+ array_roundtrip = na.ArrayStream.from_readable(
+ schema_serialized + serialized
+ ).read_all()
+ assert repr(array_roundtrip) == repr(array)
+
+ out = io.BytesIO()
+ array.serialize(out)
+ array_roundtrip = na.ArrayStream.from_readable(
+ schema_serialized + out.getvalue()
+ ).read_all()
+ assert repr(array_roundtrip) == repr(array)
+
+
def test_timestamp_array():
d1 = int(round(datetime(1985, 12, 31, 0, 0,
tzinfo=timezone.utc).timestamp() * 1e3))
d2 = int(round(datetime(2005, 3, 4, 0, 0, tzinfo=timezone.utc).timestamp()
* 1e3))
diff --git a/python/tests/test_array_stream.py
b/python/tests/test_array_stream.py
index 57fc170e..1392248b 100644
--- a/python/tests/test_array_stream.py
+++ b/python/tests/test_array_stream.py
@@ -23,7 +23,7 @@ import tempfile
import pytest
import nanoarrow as na
-from nanoarrow.ipc import Stream
+from nanoarrow.ipc import InputStream
def test_array_stream_iter():
@@ -120,7 +120,7 @@ def test_array_stream_context_manager():
def test_array_stream_from_readable():
- stream = na.ArrayStream.from_readable(Stream.example_bytes())
+ stream = na.ArrayStream.from_readable(InputStream.example_bytes())
assert stream.schema.type == na.Type.STRUCT
assert list(stream.read_all().iter_tuples()) == [(1,), (2,), (3,)]
@@ -129,7 +129,7 @@ def test_array_stream_from_path():
with tempfile.TemporaryDirectory() as td:
path = os.path.join(td, "test.arrows")
with open(path, "wb") as f:
- f.write(Stream.example_bytes())
+ f.write(InputStream.example_bytes())
stream = na.ArrayStream.from_path(path)
assert stream.schema.type == na.Type.STRUCT
@@ -140,7 +140,7 @@ def test_array_stream_from_url():
with tempfile.TemporaryDirectory() as td:
path = os.path.join(td, "test.arrows")
with open(path, "wb") as f:
- f.write(Stream.example_bytes())
+ f.write(InputStream.example_bytes())
uri = pathlib.Path(path).as_uri()
with na.ArrayStream.from_url(uri) as stream:
diff --git a/python/tests/test_ipc.py b/python/tests/test_ipc.py
index 28298027..73ca88eb 100644
--- a/python/tests/test_ipc.py
+++ b/python/tests/test_ipc.py
@@ -24,19 +24,18 @@ import pytest
from nanoarrow._utils import NanoarrowException
import nanoarrow as na
-from nanoarrow.ipc import Stream
+from nanoarrow.ipc import InputStream, StreamWriter
def test_ipc_stream_example():
-
- with Stream.example() as input:
+ with InputStream.example() as input:
assert input._is_valid() is True
assert "BytesIO object" in repr(input)
stream = na.c_array_stream(input)
assert input._is_valid() is False
assert stream.is_valid() is True
- assert repr(input) == "<nanoarrow.ipc.Stream <invalid>>"
+ assert repr(input) == "<nanoarrow.ipc.InputStream <invalid>>"
with pytest.raises(RuntimeError, match="no longer valid"):
stream = na.c_array_stream(input)
@@ -54,8 +53,8 @@ def test_ipc_stream_example():
def test_ipc_stream_from_readable():
- with io.BytesIO(Stream.example_bytes()) as f:
- with Stream.from_readable(f) as input:
+ with io.BytesIO(InputStream.example_bytes()) as f:
+ with InputStream.from_readable(f) as input:
assert input._is_valid() is True
assert "BytesIO object" in repr(input)
@@ -69,9 +68,9 @@ def test_ipc_stream_from_path():
with tempfile.TemporaryDirectory() as td:
path = os.path.join(td, "test.arrows")
with open(path, "wb") as f:
- f.write(Stream.example_bytes())
+ f.write(InputStream.example_bytes())
- with Stream.from_path(path) as input:
+ with InputStream.from_path(path) as input:
assert repr(path) in repr(input)
with na.c_array_stream(input) as stream:
batches = list(stream)
@@ -83,10 +82,10 @@ def test_ipc_stream_from_url():
with tempfile.TemporaryDirectory() as td:
path = os.path.join(td, "test.arrows")
with open(path, "wb") as f:
- f.write(Stream.example_bytes())
+ f.write(InputStream.example_bytes())
uri = pathlib.Path(path).as_uri()
- with Stream.from_url(uri) as input:
+ with InputStream.from_url(uri) as input:
with na.c_array_stream(input) as stream:
batches = list(stream)
assert len(batches) == 1
@@ -95,10 +94,10 @@ def test_ipc_stream_from_url():
def test_ipc_stream_python_exception_on_read():
class ExtraordinarilyInconvenientFile:
- def readinto(self, obj):
+ def readinto(self, *args, **kwargs):
raise RuntimeError("I error for all read requests")
- input = Stream.from_readable(ExtraordinarilyInconvenientFile())
+ input = InputStream.from_readable(ExtraordinarilyInconvenientFile())
with pytest.raises(
NanoarrowException, match="RuntimeError: I error for all read requests"
):
@@ -106,11 +105,129 @@ def test_ipc_stream_python_exception_on_read():
def test_ipc_stream_error_on_read():
- with io.BytesIO(Stream.example_bytes()[:100]) as f:
- with Stream.from_readable(f) as input:
+ with io.BytesIO(InputStream.example_bytes()[:100]) as f:
+ with InputStream.from_readable(f) as input:
with pytest.raises(
NanoarrowException,
match="Expected >= 280 bytes of remaining data",
):
na.c_array_stream(input)
+
+
+def test_writer_from_writable():
+ array = na.c_array_from_buffers(
+ na.struct({"some_col": na.int32()}),
+ length=3,
+ buffers=[],
+ children=[na.c_array([1, 2, 3], na.int32())],
+ )
+
+ out = io.BytesIO()
+ with StreamWriter.from_writable(out) as writer:
+ writer.write_array(array)
+
+ with na.ArrayStream.from_readable(out.getvalue()) as stream:
+ assert stream.read_all().to_pylist() == na.Array(array).to_pylist()
+
+
+def test_writer_from_path():
+ array = na.c_array_from_buffers(
+ na.struct({"some_col": na.int32()}),
+ length=3,
+ buffers=[],
+ children=[na.c_array([1, 2, 3], na.int32())],
+ )
+
+ with tempfile.TemporaryDirectory() as td:
+ path = os.path.join(td, "test.arrows")
+
+ with StreamWriter.from_path(path) as writer:
+ writer.write_array(array)
+
+ with na.ArrayStream.from_path(path) as stream:
+ assert stream.read_all().to_pylist() == na.Array(array).to_pylist()
+
+
+def test_writer_write_stream_schema():
+ array = na.c_array_from_buffers(
+ na.struct({"some_col": na.int32()}),
+ length=3,
+ buffers=[],
+ children=[na.c_array([1, 2, 3], na.int32())],
+ )
+ zero_chunk_array = na.Array.from_chunks([], array.schema)
+
+ out = io.BytesIO()
+ with StreamWriter.from_writable(out) as writer:
+ writer.write_stream(zero_chunk_array)
+ schema_bytes = out.getvalue()
+
+ with StreamWriter.from_writable(out) as writer:
+ out.truncate(0)
+ out.seek(0)
+ writer.write_stream(zero_chunk_array)
+ writer.write_stream(zero_chunk_array, write_schema=True)
+ two_schema_bytes = out.getvalue()
+
+ assert (schema_bytes + schema_bytes) == two_schema_bytes
+
+ with StreamWriter.from_writable(out) as writer:
+ out.truncate(0)
+ out.seek(0)
+ writer.write_array(array, write_schema=False)
+ array_bytes = out.getvalue()
+
+ with StreamWriter.from_writable(out) as writer:
+ out.truncate(0)
+ out.seek(0)
+ writer.write_array(array, write_schema=True)
+ both_bytes = out.getvalue()
+
+ assert (schema_bytes + array_bytes) == both_bytes
+
+
+def test_writer_serialize_stream():
+ array = na.c_array_from_buffers(
+ na.struct({"some_col": na.int32()}),
+ length=3,
+ buffers=[],
+ children=[na.c_array([1, 2, 3], na.int32())],
+ )
+
+ out = io.BytesIO()
+ with StreamWriter.from_writable(out) as writer:
+ writer.write_array(array)
+
+ # Check that we can't serialize after we've already written to stream
+ with pytest.raises(ValueError, match="Can't serialize_stream"):
+ writer.serialize_stream(array)
+
+ schema_and_array_bytes = out.getvalue()
+
+ end_of_stream_bytes = b"\xff\xff\xff\xff\x00\x00\x00\x00"
+
+ writer = StreamWriter.from_writable(out)
+ out.truncate(0)
+ out.seek(0)
+ writer.serialize_stream(array)
+ assert writer._is_valid() is False
+
+ serialized_bytes = out.getvalue()
+ assert (schema_and_array_bytes + end_of_stream_bytes) == serialized_bytes
+
+
+def test_writer_python_exception_on_write():
+ class ExtraordinarilyInconvenientFile:
+ def write(self, *args, **kwargs):
+ raise RuntimeError("I error for all write requests")
+
+ with pytest.raises(NanoarrowException, match="I error for all write
requests"):
+ with StreamWriter.from_writable(ExtraordinarilyInconvenientFile()) as
writer:
+ writer.write(na.c_array([], na.struct([na.int32()])))
+
+
+def test_writer_error_on_write():
+ with pytest.raises(NanoarrowException):
+ with StreamWriter.from_writable(io.BytesIO()) as writer:
+ writer.write_stream(na.c_array([], na.int32()))
diff --git a/python/tests/test_schema.py b/python/tests/test_schema.py
index 86360369..abfe6518 100644
--- a/python/tests/test_schema.py
+++ b/python/tests/test_schema.py
@@ -244,3 +244,18 @@ def test_schema_extension():
def test_schema_alias_constructor():
schema = na.schema(na.Type.INT32)
assert isinstance(schema, na.Schema)
+
+
+def test_schema_serialize():
+ import io
+
+ schema = na.struct({"some_col": na.int32()}, nullable=False)
+
+ serialized = schema.serialize()
+ schema_roundtrip = na.ArrayStream.from_readable(serialized).schema
+ assert repr(schema_roundtrip) == repr(schema)
+
+ out = io.BytesIO()
+ schema.serialize(out)
+ schema_roundtrip = na.ArrayStream.from_readable(out.getvalue()).schema
+ assert repr(schema_roundtrip) == repr(schema)