This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3aa0ec12 feat(python): Implement bindings to IPC writer (#586)
3aa0ec12 is described below

commit 3aa0ec122ab59138984a74c42f7d17c70ea8ece1
Author: Dewey Dunnington <[email protected]>
AuthorDate: Fri Sep 13 23:12:48 2024 -0500

    feat(python): Implement bindings to IPC writer (#586)
    
    This PR implements bindings to the IPC writer in the nanoarrow C
    library. This adds:
    
    - An `ipc.StreamWriter()` class roughly mirroring pyarrow's
    `ipc.Stream()`
    - `Schema.serialize()` and `Array.serialize()` to match pyarrow's
    `serialize()` methods.
    
    ```python
    import io
    import nanoarrow as na
    from nanoarrow.ipc import StreamWriter, InputStream
    
    out = io.BytesIO()
    writer = StreamWriter.from_writable(out)
    writer.write_stream(InputStream.example())
    
    na.Array(InputStream.from_readable(out.getvalue()))
    #> nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
    #> {'some_col': 1}
    #> {'some_col': 2}
    #> {'some_col': 3}
    ```
    
    ---------
    
    Co-authored-by: Joris Van den Bossche <[email protected]>
---
 dev/benchmarks/python/ipc.py         |   4 +-
 python/src/nanoarrow/_ipc_lib.pyx    | 195 ++++++++++++++++++++--
 python/src/nanoarrow/array.py        |  25 ++-
 python/src/nanoarrow/array_stream.py |  24 +--
 python/src/nanoarrow/ipc.py          | 305 +++++++++++++++++++++++++++++++----
 python/src/nanoarrow/schema.py       |  27 ++++
 python/tests/test_array.py           |  26 +++
 python/tests/test_array_stream.py    |   8 +-
 python/tests/test_ipc.py             | 145 +++++++++++++++--
 python/tests/test_schema.py          |  15 ++
 10 files changed, 699 insertions(+), 75 deletions(-)

diff --git a/dev/benchmarks/python/ipc.py b/dev/benchmarks/python/ipc.py
index b841a1d6..f717eb8f 100644
--- a/dev/benchmarks/python/ipc.py
+++ b/dev/benchmarks/python/ipc.py
@@ -43,12 +43,12 @@ class IpcReaderSuite:
         return os.path.join(self.fixtures_dir, name)
 
     def read_fixture_file(self, name):
-        with ipc.Stream.from_path(self.fixture_path(name)) as in_stream:
+        with ipc.InputStream.from_path(self.fixture_path(name)) as in_stream:
             list(na.c_array_stream(in_stream))
 
     def read_fixture_buffer(self, name):
         f = io.BytesIO(self.fixture_buffer[name])
-        with ipc.Stream.from_readable(f) as in_stream:
+        with ipc.InputStream.from_readable(f) as in_stream:
             list(na.c_array_stream(in_stream))
 
     def time_read_float64_basic_file(self):
diff --git a/python/src/nanoarrow/_ipc_lib.pyx 
b/python/src/nanoarrow/_ipc_lib.pyx
index 2db8e003..fcb6530a 100644
--- a/python/src/nanoarrow/_ipc_lib.pyx
+++ b/python/src/nanoarrow/_ipc_lib.pyx
@@ -19,18 +19,24 @@
 # cython: linetrace=True
 
 from libc.stdint cimport uint8_t, int64_t, uintptr_t
-from libc.errno cimport EIO
+from libc.errno cimport EIO, EAGAIN
 from libc.stdio cimport snprintf
 from cpython.ref cimport PyObject, Py_INCREF, Py_DECREF
 from cpython cimport Py_buffer, PyBuffer_FillInfo
 
 from nanoarrow_c cimport (
+    ArrowArrayStream,
+    ArrowArrayView,
+    ArrowSchema,
     ArrowErrorCode,
     ArrowError,
     NANOARROW_OK,
-    ArrowArrayStream,
 )
 
+from nanoarrow._schema cimport CSchema
+from nanoarrow._array cimport CArrayView
+from nanoarrow._utils cimport Error
+
 
 cdef extern from "nanoarrow_ipc.h" nogil:
     struct ArrowIpcInputStream:
@@ -48,18 +54,43 @@ cdef extern from "nanoarrow_ipc.h" nogil:
         ArrowArrayStream* out, ArrowIpcInputStream* input_stream,
         ArrowIpcArrayStreamReaderOptions* options)
 
+    struct ArrowIpcOutputStream:
+        ArrowErrorCode (*write)(ArrowIpcOutputStream* stream, const void* buf,
+                                int64_t buf_size_bytes, int64_t* 
size_written_out,
+                                ArrowError* error)
+        void (*release)(ArrowIpcOutputStream* stream)
+        void* private_data
 
-cdef class PyInputStreamPrivate:
+    struct ArrowIpcWriter:
+        void* private_data
+
+    ArrowErrorCode ArrowIpcWriterInit(ArrowIpcWriter* writer,
+                                      ArrowIpcOutputStream* output_stream)
+    void ArrowIpcWriterReset(ArrowIpcWriter* writer)
+    ArrowErrorCode ArrowIpcWriterWriteSchema(ArrowIpcWriter* writer,
+                                             const ArrowSchema* in_,
+                                             ArrowError* error)
+    ArrowErrorCode ArrowIpcWriterWriteArrayView(ArrowIpcWriter* writer,
+                                                const ArrowArrayView* in_,
+                                                ArrowError* error)
+
+    ArrowErrorCode ArrowIpcWriterWriteArrayStream(ArrowIpcWriter* writer,
+                                                  ArrowArrayStream* in_,
+                                                  ArrowError* error)
+
+cdef class PyStreamPrivate:
     cdef object _obj
     cdef bint _close_obj
     cdef void* _addr
     cdef Py_ssize_t _size_bytes
+    cdef bint _buffer_readonly
 
-    def __cinit__(self, obj, close_obj=False):
+    def __cinit__(self, obj, bint buffer_readonly, bint close_obj=False):
         self._obj = obj
         self._close_obj = close_obj
         self._addr = NULL
         self._size_bytes = 0
+        self._buffer_readonly = buffer_readonly
 
     @property
     def obj(self):
@@ -78,14 +109,16 @@ cdef class PyInputStreamPrivate:
         return self._size_bytes
 
     # Implement the buffer protocol so that this object can be used as
-    # the argument to xxx.readinto(). This ensures that no extra copies
-    # (beyond any buffering done by the upstream file-like object) are held
-    # since the upstream object has access to the preallocated output buffer.
-    # In this case, the preallocation is done by the ArrowArrayStream
+    # the argument to xxx.readinto() or xxx.write(). This ensures that
+    # no extra copies (beyond any buffering done by the upstream file-like 
object)
+    # are held since the upstream object has access to the preallocated output 
buffer.
+    # In the read case, the preallocation is done by the ArrowArrayStream
     # implementation before issuing each read call (two per message, with
     # an extra call for a RecordBatch message to get the actual buffer data).
+    # In the write case, this will be a view of whatever information was 
provided to
+    # the write callback.
     def __getbuffer__(self, Py_buffer* buffer, int flags):
-        PyBuffer_FillInfo(buffer, self, self._addr, self._size_bytes, 0, flags)
+        PyBuffer_FillInfo(buffer, self, self._addr, self._size_bytes, 
self._buffer_readonly, flags)
 
     def __releasebuffer__(self, Py_buffer* buffer):
         pass
@@ -100,8 +133,16 @@ cdef ArrowErrorCode 
py_input_stream_read(ArrowIpcInputStream* stream, uint8_t* b
         stream_private.set_buffer(<uintptr_t>buf, buf_size_bytes)
 
         try:
-            size_read_out[0] = stream_private.obj.readinto(stream_private)
-            return NANOARROW_OK
+            # Non-blocking streams may return None here, or buffered
+            # wrappers of them may raise BufferedIOError
+            read_result = stream_private.obj.readinto(stream_private)
+
+            if read_result is None:
+                size_read_out[0] = 0
+                return EAGAIN
+            else:
+                size_read_out[0] = read_result
+                return NANOARROW_OK
         except Exception as e:
             cls = type(e).__name__.encode()
             msg = str(e).encode()
@@ -126,6 +167,51 @@ cdef void py_input_stream_release(ArrowIpcInputStream* 
stream) noexcept nogil:
     stream.release = NULL
 
 
+
+cdef ArrowErrorCode py_output_stream_write(ArrowIpcOutputStream* stream, const 
void* buf,
+                                           int64_t buf_size_bytes, int64_t* 
size_written_out,
+                                           ArrowError* error) noexcept nogil:
+
+    with gil:
+        stream_private = <object>stream.private_data
+        stream_private.set_buffer(<uintptr_t>buf, buf_size_bytes)
+
+        try:
+            # Non-blocking streams may return None here, or buffered
+            # wrappers of them may raise BufferedIOError
+            write_result = stream_private.obj.write(stream_private)
+
+            # Non-blocking streams may return None here
+            if write_result is None:
+                size_written_out[0] = 0
+                return EAGAIN
+            else:
+                size_written_out[0] = write_result
+                return NANOARROW_OK
+        except Exception as e:
+            cls = type(e).__name__.encode()
+            msg = str(e).encode()
+            snprintf(
+                error.message,
+                sizeof(error.message),
+                "%s: %s",
+                <const char*>cls,
+                <const char*>msg
+            )
+            return EIO
+
+cdef void py_output_stream_release(ArrowIpcOutputStream* stream) noexcept 
nogil:
+    with gil:
+        stream_private = <object>stream.private_data
+        if stream_private.close_obj:
+            stream_private.obj.close()
+
+        Py_DECREF(stream_private)
+
+    stream.private_data = NULL
+    stream.release = NULL
+
+
 cdef class CIpcInputStream:
     cdef ArrowIpcInputStream _stream
 
@@ -150,7 +236,11 @@ cdef class CIpcInputStream:
     @staticmethod
     def from_readable(obj, close_obj=False):
         cdef CIpcInputStream stream = CIpcInputStream()
-        cdef PyInputStreamPrivate private_data = PyInputStreamPrivate(obj, 
close_obj)
+        cdef PyStreamPrivate private_data = PyStreamPrivate(
+            obj,
+            buffer_readonly=False,
+            close_obj=close_obj
+        )
 
         stream._stream.private_data = <PyObject*>private_data
         Py_INCREF(private_data)
@@ -166,3 +256,84 @@ def init_array_stream(CIpcInputStream input_stream, 
uintptr_t out):
     cdef int code = ArrowIpcArrayStreamReaderInit(out_ptr, 
&input_stream._stream, NULL)
     if code != NANOARROW_OK:
         raise RuntimeError(f"ArrowIpcArrayStreamReaderInit() failed with code 
[{code}]")
+
+
+cdef class CIpcOutputStream:
+    cdef ArrowIpcOutputStream _stream
+
+    def __cinit__(self):
+        self._stream.release = NULL
+
+    def is_valid(self):
+        return self._stream.release != NULL
+
+    def __dealloc__(self):
+        # Duplicating release() to avoid Python API calls in the deallocator
+        if self._stream.release != NULL:
+            self._stream.release(&self._stream)
+
+    def release(self):
+        if self._stream.release != NULL:
+            self._stream.release(&self._stream)
+            return True
+        else:
+            return False
+
+    @staticmethod
+    def from_writable(obj, close_obj=False):
+        cdef CIpcOutputStream stream = CIpcOutputStream()
+        cdef PyStreamPrivate private_data = PyStreamPrivate(
+            obj,
+            buffer_readonly=True,
+            close_obj=close_obj
+        )
+
+        stream._stream.private_data = <PyObject*>private_data
+        Py_INCREF(private_data)
+        stream._stream.write = &py_output_stream_write
+        stream._stream.release = &py_output_stream_release
+        return stream
+
+
+cdef class CIpcWriter:
+    cdef ArrowIpcWriter _writer
+
+    def __cinit__(self, CIpcOutputStream stream):
+        self._writer.private_data = NULL
+        if not stream.is_valid():
+            raise ValueError("Can't create writer from released stream")
+
+        cdef int code = ArrowIpcWriterInit(&self._writer, &stream._stream)
+        Error.raise_error_not_ok("ArrowIpcWriterInit()", code)
+
+    def is_valid(self):
+        return self._writer.private_data != NULL
+
+    def __dealloc__(self):
+        if self._writer.private_data != NULL:
+            ArrowIpcWriterReset(&self._writer)
+
+    def release(self):
+        if self._writer.private_data != NULL:
+            ArrowIpcWriterReset(&self._writer)
+
+    def write_schema(self, CSchema schema):
+        cdef Error error = Error()
+        cdef int code = ArrowIpcWriterWriteSchema(&self._writer, schema._ptr, 
&error.c_error)
+        error.raise_message_not_ok("ArrowIpcWriterWriteSchema()", code)
+
+    def write_array_view(self, CArrayView array_view):
+        cdef Error error = Error()
+        cdef int code = ArrowIpcWriterWriteArrayView(&self._writer, 
array_view._ptr, &error.c_error)
+        error.raise_message_not_ok("ArrowIpcWriterWriteArrayView()", code)
+
+    def write_array_stream(self, uintptr_t stream_addr):
+        cdef ArrowArrayStream* array_stream = <ArrowArrayStream*>stream_addr
+        cdef Error error = Error()
+        cdef int code = ArrowIpcWriterWriteArrayStream(&self._writer, 
array_stream, &error.c_error)
+        error.raise_message_not_ok("ArrowIpcWriterWriteArrayStream()", code)
+
+    def write_end_of_stream(self):
+        cdef Error error = Error()
+        cdef int code = ArrowIpcWriterWriteArrayView(&self._writer, NULL, 
&error.c_error)
+        error.raise_message_not_ok("ArrowIpcWriterWriteArrayView()", code)
diff --git a/python/src/nanoarrow/array.py b/python/src/nanoarrow/array.py
index 4a7210fb..35ca7c68 100644
--- a/python/src/nanoarrow/array.py
+++ b/python/src/nanoarrow/array.py
@@ -17,7 +17,7 @@
 
 import itertools
 from functools import cached_property
-from typing import Iterable, Tuple
+from typing import Iterable, Tuple, Union
 
 from nanoarrow._array import CArray, CArrayView
 from nanoarrow._array_stream import CMaterializedArrayStream
@@ -542,6 +542,29 @@ class Array(ArrayViewVisitable):
             "to iterate over elements of this Array"
         )
 
+    def serialize(self, dst=None) -> Union[bytes, None]:
+        """Write this Array into dst as zero or more encapsulated IPC messages
+
+        Parameters
+        ----------
+        dst : file-like, optional
+            If present, a file-like object into which the chunks of this array
+            should be serialized. If omitted, this will create a 
``io.BytesIO()``
+            and return the serialized result.
+        """
+        from nanoarrow.ipc import StreamWriter
+
+        if dst is None:
+            import io
+
+            with io.BytesIO() as dst:
+                writer = StreamWriter.from_writable(dst)
+                writer.write_stream(self, write_schema=False)
+                return dst.getvalue()
+        else:
+            writer = StreamWriter.from_writable(dst)
+            writer.write_stream(self, write_schema=False)
+
     def to_string(self, width_hint=80, items_hint=10) -> str:
         cls_name = _repr_utils.make_class_label(self, module="nanoarrow")
         len_text = f"[{len(self)}]"
diff --git a/python/src/nanoarrow/array_stream.py 
b/python/src/nanoarrow/array_stream.py
index e7282721..d169b966 100644
--- a/python/src/nanoarrow/array_stream.py
+++ b/python/src/nanoarrow/array_stream.py
@@ -211,17 +211,17 @@ class ArrayStream(ArrayViewVisitable):
         Examples
         --------
         >>> import nanoarrow as na
-        >>> from nanoarrow.ipc import Stream
-        >>> with na.ArrayStream.from_readable(Stream.example_bytes()) as 
stream:
+        >>> from nanoarrow.ipc import InputStream
+        >>> with na.ArrayStream.from_readable(InputStream.example_bytes()) as 
stream:
         ...     stream.read_all()
         nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
         {'some_col': 1}
         {'some_col': 2}
         {'some_col': 3}
         """
-        from nanoarrow.ipc import Stream
+        from nanoarrow.ipc import InputStream
 
-        with Stream.from_readable(obj) as ipc_stream:
+        with InputStream.from_readable(obj) as ipc_stream:
             return ArrayStream(ipc_stream)
 
     @staticmethod
@@ -233,11 +233,11 @@ class ArrayStream(ArrayViewVisitable):
         >>> import tempfile
         >>> import os
         >>> import nanoarrow as na
-        >>> from nanoarrow.ipc import Stream
+        >>> from nanoarrow.ipc import InputStream
         >>> with tempfile.TemporaryDirectory() as td:
         ...     path = os.path.join(td, "test.arrows")
         ...     with open(path, "wb") as f:
-        ...         nbytes = f.write(Stream.example_bytes())
+        ...         nbytes = f.write(InputStream.example_bytes())
         ...
         ...     with na.ArrayStream.from_path(path) as stream:
         ...         stream.read_all()
@@ -246,9 +246,9 @@ class ArrayStream(ArrayViewVisitable):
         {'some_col': 2}
         {'some_col': 3}
         """
-        from nanoarrow.ipc import Stream
+        from nanoarrow.ipc import InputStream
 
-        with Stream.from_path(obj, *args, **kwargs) as ipc_stream:
+        with InputStream.from_path(obj, *args, **kwargs) as ipc_stream:
             return ArrayStream(ipc_stream)
 
     @staticmethod
@@ -261,11 +261,11 @@ class ArrayStream(ArrayViewVisitable):
         >>> import tempfile
         >>> import os
         >>> import nanoarrow as na
-        >>> from nanoarrow.ipc import Stream
+        >>> from nanoarrow.ipc import InputStream
         >>> with tempfile.TemporaryDirectory() as td:
         ...     path = os.path.join(td, "test.arrows")
         ...     with open(path, "wb") as f:
-        ...         nbytes = f.write(Stream.example_bytes())
+        ...         nbytes = f.write(InputStream.example_bytes())
         ...
         ...     uri = pathlib.Path(path).as_uri()
         ...     with na.ArrayStream.from_url(uri) as stream:
@@ -275,7 +275,7 @@ class ArrayStream(ArrayViewVisitable):
         {'some_col': 2}
         {'some_col': 3}
         """
-        from nanoarrow.ipc import Stream
+        from nanoarrow.ipc import InputStream
 
-        with Stream.from_url(obj, *args, **kwargs) as ipc_stream:
+        with InputStream.from_url(obj, *args, **kwargs) as ipc_stream:
             return ArrayStream(ipc_stream)
diff --git a/python/src/nanoarrow/ipc.py b/python/src/nanoarrow/ipc.py
index 91916904..ef02a420 100644
--- a/python/src/nanoarrow/ipc.py
+++ b/python/src/nanoarrow/ipc.py
@@ -18,13 +18,21 @@
 import io
 
 from nanoarrow._array_stream import CArrayStream
-from nanoarrow._ipc_lib import CIpcInputStream, init_array_stream
+from nanoarrow._ipc_lib import (
+    CIpcInputStream,
+    CIpcOutputStream,
+    CIpcWriter,
+    init_array_stream,
+)
 from nanoarrow._utils import obj_is_buffer
+from nanoarrow.array import c_array
+from nanoarrow.array_stream import c_array_stream
+from nanoarrow.iterator import ArrayViewBaseIterator
 
 from nanoarrow import _repr_utils
 
 
-class Stream:
+class InputStream:
     """Stream of serialized Arrow data
 
     Reads file paths or otherwise readable file objects that contain
@@ -41,8 +49,8 @@ class Stream:
     --------
 
     >>> import nanoarrow as na
-    >>> from nanoarrow.ipc import Stream
-    >>> with Stream.example() as inp, na.c_array_stream(inp) as stream:
+    >>> from nanoarrow.ipc import InputStream
+    >>> with InputStream.example() as inp, na.c_array_stream(inp) as stream:
     ...     stream
     <nanoarrow.c_array_stream.CArrayStream>
     - get_schema(): struct<some_col: int32>
@@ -62,7 +70,7 @@ class Stream:
         input stream to an ArrowArrayStream wrapped by a PyCapsule.
         """
         if not self._is_valid():
-            raise RuntimeError("nanoarrow.ipc.Stream is no longer valid")
+            raise RuntimeError("nanoarrow.ipc.InputStream is no longer valid")
 
         with CArrayStream.allocate() as array_stream:
             init_array_stream(self._stream, array_stream._addr())
@@ -81,7 +89,7 @@ class Stream:
         """Wrap an open readable file or buffer as an Arrow IPC stream
 
         Wraps a readable object (specificially, an object that implements a
-        ``readinto()`` method) as a non-owning Stream. Closing ``obj`` remains
+        ``readinto()`` method) as a non-owning InputStream. Closing ``obj`` 
remains
         the caller's responsibility: neither this stream nor the resulting 
array
         stream will call ``obj.close()``.
 
@@ -94,8 +102,8 @@ class Stream:
         --------
 
         >>> import nanoarrow as na
-        >>> from nanoarrow.ipc import Stream
-        >>> ipc_stream = Stream.from_readable(Stream.example_bytes())
+        >>> from nanoarrow.ipc import InputStream
+        >>> ipc_stream = InputStream.from_readable(InputStream.example_bytes())
         >>> na.c_array_stream(ipc_stream)
         <nanoarrow.c_array_stream.CArrayStream>
         - get_schema(): struct<some_col: int32>
@@ -106,7 +114,7 @@ class Stream:
         else:
             close_obj = False
 
-        out = Stream()
+        out = InputStream()
         out._stream = CIpcInputStream.from_readable(obj, close_obj=close_obj)
         out._desc = repr(obj)
         return out
@@ -116,8 +124,8 @@ class Stream:
         """Wrap a local file as an IPC stream
 
         Wraps a pathlike object (specificially, one that can be passed to 
``open()``)
-        as an owning Stream. The file will be opened in binary mode and will 
be closed
-        when this stream or the resulting array stream is released.
+        as an owning InputStream. The file will be opened in binary mode and 
will be
+        closed when this stream or the resulting array stream is released.
 
         Parameters
         ----------
@@ -130,18 +138,18 @@ class Stream:
         >>> import tempfile
         >>> import os
         >>> import nanoarrow as na
-        >>> from nanoarrow.ipc import Stream
+        >>> from nanoarrow.ipc import InputStream
         >>> with tempfile.TemporaryDirectory() as td:
         ...     path = os.path.join(td, "test.arrows")
         ...     with open(path, "wb") as f:
-        ...         nbytes = f.write(Stream.example_bytes())
+        ...         nbytes = f.write(InputStream.example_bytes())
         ...
-        ...     with Stream.from_path(path) as inp, na.c_array_stream(inp) as 
stream:
-        ...         stream
+        ...     with InputStream.from_path(path) as inp, 
na.c_array_stream(inp) as s:
+        ...         s
         <nanoarrow.c_array_stream.CArrayStream>
         - get_schema(): struct<some_col: int32>
         """
-        out = Stream()
+        out = InputStream()
         out._stream = CIpcInputStream.from_readable(
             open(obj, "rb", *args, **kwargs), close_obj=True
         )
@@ -153,7 +161,7 @@ class Stream:
         """Wrap a URL as an IPC stream
 
         Wraps a URL (specificially, one that can be passed to
-        ``urllib.request.urlopen()``) as an owning Stream. The URL will be
+        ``urllib.request.urlopen()``) as an owning InputStream. The URL will be
         closed when this stream or the resulting array stream is released.
 
         Parameters
@@ -168,21 +176,21 @@ class Stream:
         >>> import tempfile
         >>> import os
         >>> import nanoarrow as na
-        >>> from nanoarrow.ipc import Stream
+        >>> from nanoarrow.ipc import InputStream
         >>> with tempfile.TemporaryDirectory() as td:
         ...     path = os.path.join(td, "test.arrows")
         ...     with open(path, "wb") as f:
-        ...         nbytes = f.write(Stream.example_bytes())
+        ...         nbytes = f.write(InputStream.example_bytes())
         ...
         ...     uri = pathlib.Path(path).as_uri()
-        ...     with Stream.from_url(uri) as inp, na.c_array_stream(inp) as 
stream:
+        ...     with InputStream.from_url(uri) as inp, na.c_array_stream(inp) 
as stream:
         ...         stream
         <nanoarrow.c_array_stream.CArrayStream>
         - get_schema(): struct<some_col: int32>
         """
         import urllib.request
 
-        out = Stream()
+        out = InputStream()
         out._stream = CIpcInputStream.from_readable(
             urllib.request.urlopen(obj, *args, **kwargs), close_obj=True
         )
@@ -191,7 +199,7 @@ class Stream:
 
     @staticmethod
     def example():
-        """Example IPC Stream
+        """Example IPC InputStream
 
         A self-contained example whose value is the serialized version of
         ``DataFrame({"some_col": [1, 2, 3]})``. This may be used for testing
@@ -201,17 +209,17 @@ class Stream:
         Examples
         --------
 
-        >>> from nanoarrow.ipc import Stream
-        >>> Stream.example()
-        <nanoarrow.ipc.Stream <_io.BytesIO object at ...>>
+        >>> from nanoarrow.ipc import InputStream
+        >>> InputStream.example()
+        <nanoarrow.ipc.InputStream <_io.BytesIO object at ...>>
         """
-        return Stream.from_readable(Stream.example_bytes())
+        return InputStream.from_readable(InputStream.example_bytes())
 
     @staticmethod
     def example_bytes():
         """Example stream bytes
 
-        The underlying bytes of the :staticmethod:`example` Stream. This is 
useful
+        The underlying bytes of the :staticmethod:`example` InputStream. This 
is useful
         for writing files or creating other types of test input.
 
         Examples
@@ -219,11 +227,11 @@ class Stream:
 
         >>> import os
         >>> import tempfile
-        >>> from nanoarrow.ipc import Stream
+        >>> from nanoarrow.ipc import InputStream
         >>> with tempfile.TemporaryDirectory() as td:
         ...     path = os.path.join(td, "test.arrows")
         ...     with open(path, "wb") as f:
-        ...         f.write(Stream.example_bytes())
+        ...         f.write(InputStream.example_bytes())
         440
         """
         return _EXAMPLE_IPC_SCHEMA + _EXAMPLE_IPC_BATCH
@@ -236,9 +244,246 @@ class Stream:
             return f"<{class_label} <invalid>>"
 
 
+class StreamWriter:
+    """Write streams of serialized Arrow data
+
+    Provides various ways of writing Arrow schemas and record batches as
+    binary data serialized using the Arrow IPC streaming format.
+
+    Use :staticmethod:`from_writeable` or :staticmethod:`from_path`, or
+    to construct a writer.
+
+    Examples
+    --------
+
+    >>> import io
+    >>> import nanoarrow as na
+    >>> from nanoarrow.ipc import StreamWriter
+    >>>
+    >>> out = io.BytesIO()
+    >>> array = na.c_array_from_buffers(
+    ...     na.struct({"some_col": na.int32()}),
+    ...     length=3,
+    ...     buffers=[],
+    ...     children=[na.c_array([1, 2, 3], na.int32())]
+    ... )
+    >>>
+    >>> with StreamWriter.from_writable(out) as writer:
+    ...     writer.write_stream(array)
+    >>>
+    >>> na.ArrayStream.from_readable(out.getvalue()).read_all()
+    nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
+    {'some_col': 1}
+    {'some_col': 2}
+    {'some_col': 3}
+    """
+
+    def __init__(self):
+        self._writer = None
+        self._desc = None
+        self._iterator = None
+
+    def _is_valid(self) -> bool:
+        return self._writer is not None and self._writer.is_valid()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args, **kwargs):
+        self.close()
+
+    def release(self):
+        """Close stream without writing the end-of-stream marker"""
+        if not self._is_valid():
+            return
+
+        self._writer.release()
+        self._writer = None
+
+    def close(self):
+        """Close stream and write end-of-stream marker"""
+        if not self._is_valid():
+            return
+
+        self._writer.write_end_of_stream()
+        self.release()
+
+    def write_array(self, obj, schema=None, *, write_schema=None):
+        """Interpret obj as an array and write to stream
+
+        Parameters
+        ----------
+        obj : array-like
+            An array-like object as sanitized by :func:`c_array`.
+        schema : schema-like, optional
+            An optional schema, passed to :func:`c_array`.
+        write_schema : bool, optional
+            See :meth:`write_stream`.
+        """
+        obj = c_array(obj)
+        return self.write_stream(obj, schema, write_schema=write_schema)
+
+    def write_stream(self, obj, schema=None, *, write_schema=None):
+        """Interpret obj as a stream of arrays and write to stream
+
+        Writes all arrays from obj to the output stream.
+
+        Parameters
+        ----------
+        obj : array stream-like
+            An array-like or array stream-like object as sanitized by
+            :func:`c_array_stream`.
+        schema : schema-like, optional
+            An optional schema, passed to :func:`c_array_stream`.
+        write_schema : bool, optional
+            If True, the schema will always be written to the output stream; 
if False,
+            the schema will never be written to the output stream. If omitted, 
the
+            schema will be written if nothing has yet been written to the 
output.
+        """
+        if not self._is_valid():
+            raise ValueError("Can't write to released StreamWriter")
+
+        with c_array_stream(obj, schema=schema) as stream:
+            if self._iterator is None:
+                self._iterator = 
ArrayViewBaseIterator(stream._get_cached_schema())
+                if write_schema is None:
+                    write_schema = True
+
+            if write_schema:
+                self._writer.write_schema(self._iterator._schema)
+
+            for array in stream:
+                self._iterator._set_array(array)
+                self._writer.write_array_view(self._iterator._array_view)
+
+    def serialize_stream(self, obj, schema=None):
+        """Interpret obj as a stream of arrays, write to stream, and close
+
+        Like :meth:`write_stream` except always writes a schema message and
+        always appends the end-of-stream marker to the output. This method
+        also takes a potentially more efficient path that uses fewer Python
+        calls at the expense of less flexibility. After calling this method,
+        the writer is released and subequent calls to methods will error.
+
+        Parameters
+        ----------
+        obj : array stream-like
+            An array-like or array stream-like object as sanitized by
+            :func:`c_array_stream`.
+        schema : schema-like, optional
+            An optional schema, passed to :func:`c_array_stream`.
+        """
+        if not self._is_valid():
+            raise ValueError("Can't write to released StreamWriter")
+
+        # If we've already written a schema or we've explicitly been asked
+        # to write one, we can't write using the stream writer because it
+        # automatically appends a schema. We can, however, write the rest
+        # of the stream and EOS using write().
+        if self._iterator is not None:
+            raise ValueError("Can't serialize_stream() into a non-empty 
writer")
+
+        # Write the entire stream and release the writer. We can't
+        # use close() because that would write the EOS and the stream
+        # writer has already appended this.
+        with self, c_array_stream(obj, schema=schema) as stream:
+            self._writer.write_array_stream(stream._addr())
+            self.release()
+
+    @staticmethod
+    def from_writable(obj):
+        """Write an Arrow IPC stream to a writable file
+
+        Wraps a writable object (specificially, an object that implements a
+        ``write()`` method) as a non-owning StreamWriter. Closing ``obj`` 
remains
+        the caller's responsibility (i.e., closing this object will not call
+        ``obj.close()``.
+
+        Parameters
+        ----------
+        obj : A writable file-like object supporting ``write()``.
+
+        Examples
+        --------
+
+        >>> import io
+        >>> import nanoarrow as na
+        >>> from nanoarrow.ipc import StreamWriter
+        >>>
+        >>> out = io.BytesIO()
+        >>> array = na.c_array_from_buffers(
+        ...     na.struct({"some_col": na.int32()}),
+        ...     length=3,
+        ...     buffers=[],
+        ...     children=[na.c_array([1, 2, 3], na.int32())]
+        ... )
+        >>>
+        >>> with StreamWriter.from_writable(out) as writer:
+        ...     writer.write_stream(array)
+        >>>
+        >>> na.ArrayStream.from_readable(out.getvalue()).read_all()
+        nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
+        {'some_col': 1}
+        {'some_col': 2}
+        {'some_col': 3}
+        """
+        out = StreamWriter()
+        stream = CIpcOutputStream.from_writable(obj, close_obj=False)
+        out._desc = repr(obj)
+
+        out._writer = CIpcWriter(stream)
+        return out
+
+    @staticmethod
+    def from_path(obj, *args, **kwargs):
+        """Wrap a local file as an IPC stream
+
+        Wraps a pathlike object (specificially, one that can be passed to 
``open()``)
+        as an owning StreamWriter. The file will be opened in (writable) 
binary mode and
+        will be closed when the returned writer is closed.
+
+        Parameters
+        ----------
+        obj : path-like
+            A string or path-like object that can be passed to ``open()``
+
+        Examples
+        --------
+        >>> import os
+        >>> import tempfile
+        >>> import nanoarrow as na
+        >>> from nanoarrow.ipc import StreamWriter
+        >>>
+        >>> array = na.c_array_from_buffers(
+        ...     na.struct({"some_col": na.int32()}),
+        ...     length=3,
+        ...     buffers=[],
+        ...     children=[na.c_array([1, 2, 3], na.int32())]
+        ... )
+        >>>
+        >>> with tempfile.TemporaryDirectory() as td:
+        ...     path = os.path.join(td, "test.arrows")
+        ...     with StreamWriter.from_path(path) as writer:
+        ...         writer.write_stream(array)
+        ...
+        ...     with na.ArrayStream.from_path(path) as stream:
+        ...         stream.read_all()
+        nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
+        {'some_col': 1}
+        {'some_col': 2}
+        {'some_col': 3}
+        """
+        out = StreamWriter()
+        stream = CIpcOutputStream.from_writable(
+            open(obj, "wb", *args, **kwargs), close_obj=True
+        )
+        out._writer = CIpcWriter(stream)
+        return out
+
+
 # A self-contained example whose value is the serialized verison of
 # DataFrame({"some_col": [1, 2, 3]}). Used to make the tests and documentation
-# self-contained since we don't have an IPC writer.
+# self-contained.
 _EXAMPLE_IPC_SCHEMA = (
     
b"\xff\xff\xff\xff\x10\x01\x00\x00\x10\x00\x00\x00\x00\x00\x0a\x00\x0e\x00\x06"
     
b"\x00\x05\x00\x08\x00\x0a\x00\x00\x00\x00\x01\x04\x00\x10\x00\x00\x00\x00\x00"
diff --git a/python/src/nanoarrow/schema.py b/python/src/nanoarrow/schema.py
index 7d886067..8adae418 100644
--- a/python/src/nanoarrow/schema.py
+++ b/python/src/nanoarrow/schema.py
@@ -470,6 +470,33 @@ class Schema:
         """
         return [self.field(i) for i in range(self.n_fields)]
 
+    def serialize(self, dst=None) -> Union[bytes, None]:
+        """Write this Schema into dst as an encapsulated IPC message
+
+        Parameters
+        ----------
+        dst : file-like, optional
+            If present, a file-like object into which the schema should be
+            serialized. If omitted, this will create a ``io.BytesIO()`` and
+            return the serialized result.
+        """
+        from nanoarrow.c_array_stream import CArrayStream
+
+        from nanoarrow.ipc import StreamWriter
+
+        empty = CArrayStream.from_c_arrays([], self._c_schema, validate=False)
+
+        if dst is None:
+            import io
+
+            with io.BytesIO() as dst:
+                writer = StreamWriter.from_writable(dst)
+                writer.write_stream(empty)
+                return dst.getvalue()
+        else:
+            writer = StreamWriter.from_writable(dst)
+            writer.write_stream(empty)
+
     def __repr__(self) -> str:
         return _schema_repr(self)
 
diff --git a/python/tests/test_array.py b/python/tests/test_array.py
index 2dfd83c8..4baae94e 100644
--- a/python/tests/test_array.py
+++ b/python/tests/test_array.py
@@ -357,6 +357,32 @@ def test_array_inspect(capsys):
     assert captured.out.startswith("<ArrowArray struct<col0: int32")
 
 
+def test_array_serialize():
+    import io
+
+    c_array = na.c_array_from_buffers(
+        na.struct({"some_col": na.int32()}, nullable=False),
+        length=3,
+        buffers=[],
+        children=[na.c_array([1, 2, 3], na.int32())],
+    )
+    array = na.Array(c_array)
+    schema_serialized = array.schema.serialize()
+
+    serialized = array.serialize()
+    array_roundtrip = na.ArrayStream.from_readable(
+        schema_serialized + serialized
+    ).read_all()
+    assert repr(array_roundtrip) == repr(array)
+
+    out = io.BytesIO()
+    array.serialize(out)
+    array_roundtrip = na.ArrayStream.from_readable(
+        schema_serialized + out.getvalue()
+    ).read_all()
+    assert repr(array_roundtrip) == repr(array)
+
+
 def test_timestamp_array():
     d1 = int(round(datetime(1985, 12, 31, 0, 0, 
tzinfo=timezone.utc).timestamp() * 1e3))
     d2 = int(round(datetime(2005, 3, 4, 0, 0, tzinfo=timezone.utc).timestamp() 
* 1e3))
diff --git a/python/tests/test_array_stream.py 
b/python/tests/test_array_stream.py
index 57fc170e..1392248b 100644
--- a/python/tests/test_array_stream.py
+++ b/python/tests/test_array_stream.py
@@ -23,7 +23,7 @@ import tempfile
 import pytest
 
 import nanoarrow as na
-from nanoarrow.ipc import Stream
+from nanoarrow.ipc import InputStream
 
 
 def test_array_stream_iter():
@@ -120,7 +120,7 @@ def test_array_stream_context_manager():
 
 
 def test_array_stream_from_readable():
-    stream = na.ArrayStream.from_readable(Stream.example_bytes())
+    stream = na.ArrayStream.from_readable(InputStream.example_bytes())
     assert stream.schema.type == na.Type.STRUCT
     assert list(stream.read_all().iter_tuples()) == [(1,), (2,), (3,)]
 
@@ -129,7 +129,7 @@ def test_array_stream_from_path():
     with tempfile.TemporaryDirectory() as td:
         path = os.path.join(td, "test.arrows")
         with open(path, "wb") as f:
-            f.write(Stream.example_bytes())
+            f.write(InputStream.example_bytes())
 
         stream = na.ArrayStream.from_path(path)
         assert stream.schema.type == na.Type.STRUCT
@@ -140,7 +140,7 @@ def test_array_stream_from_url():
     with tempfile.TemporaryDirectory() as td:
         path = os.path.join(td, "test.arrows")
         with open(path, "wb") as f:
-            f.write(Stream.example_bytes())
+            f.write(InputStream.example_bytes())
 
         uri = pathlib.Path(path).as_uri()
         with na.ArrayStream.from_url(uri) as stream:
diff --git a/python/tests/test_ipc.py b/python/tests/test_ipc.py
index 28298027..73ca88eb 100644
--- a/python/tests/test_ipc.py
+++ b/python/tests/test_ipc.py
@@ -24,19 +24,18 @@ import pytest
 from nanoarrow._utils import NanoarrowException
 
 import nanoarrow as na
-from nanoarrow.ipc import Stream
+from nanoarrow.ipc import InputStream, StreamWriter
 
 
 def test_ipc_stream_example():
-
-    with Stream.example() as input:
+    with InputStream.example() as input:
         assert input._is_valid() is True
         assert "BytesIO object" in repr(input)
 
         stream = na.c_array_stream(input)
         assert input._is_valid() is False
         assert stream.is_valid() is True
-        assert repr(input) == "<nanoarrow.ipc.Stream <invalid>>"
+        assert repr(input) == "<nanoarrow.ipc.InputStream <invalid>>"
         with pytest.raises(RuntimeError, match="no longer valid"):
             stream = na.c_array_stream(input)
 
@@ -54,8 +53,8 @@ def test_ipc_stream_example():
 
 
 def test_ipc_stream_from_readable():
-    with io.BytesIO(Stream.example_bytes()) as f:
-        with Stream.from_readable(f) as input:
+    with io.BytesIO(InputStream.example_bytes()) as f:
+        with InputStream.from_readable(f) as input:
             assert input._is_valid() is True
             assert "BytesIO object" in repr(input)
 
@@ -69,9 +68,9 @@ def test_ipc_stream_from_path():
     with tempfile.TemporaryDirectory() as td:
         path = os.path.join(td, "test.arrows")
         with open(path, "wb") as f:
-            f.write(Stream.example_bytes())
+            f.write(InputStream.example_bytes())
 
-        with Stream.from_path(path) as input:
+        with InputStream.from_path(path) as input:
             assert repr(path) in repr(input)
             with na.c_array_stream(input) as stream:
                 batches = list(stream)
@@ -83,10 +82,10 @@ def test_ipc_stream_from_url():
     with tempfile.TemporaryDirectory() as td:
         path = os.path.join(td, "test.arrows")
         with open(path, "wb") as f:
-            f.write(Stream.example_bytes())
+            f.write(InputStream.example_bytes())
 
         uri = pathlib.Path(path).as_uri()
-        with Stream.from_url(uri) as input:
+        with InputStream.from_url(uri) as input:
             with na.c_array_stream(input) as stream:
                 batches = list(stream)
                 assert len(batches) == 1
@@ -95,10 +94,10 @@ def test_ipc_stream_from_url():
 
 def test_ipc_stream_python_exception_on_read():
     class ExtraordinarilyInconvenientFile:
-        def readinto(self, obj):
+        def readinto(self, *args, **kwargs):
             raise RuntimeError("I error for all read requests")
 
-    input = Stream.from_readable(ExtraordinarilyInconvenientFile())
+    input = InputStream.from_readable(ExtraordinarilyInconvenientFile())
     with pytest.raises(
         NanoarrowException, match="RuntimeError: I error for all read requests"
     ):
@@ -106,11 +105,129 @@ def test_ipc_stream_python_exception_on_read():
 
 
 def test_ipc_stream_error_on_read():
-    with io.BytesIO(Stream.example_bytes()[:100]) as f:
-        with Stream.from_readable(f) as input:
+    with io.BytesIO(InputStream.example_bytes()[:100]) as f:
+        with InputStream.from_readable(f) as input:
 
             with pytest.raises(
                 NanoarrowException,
                 match="Expected >= 280 bytes of remaining data",
             ):
                 na.c_array_stream(input)
+
+
+def test_writer_from_writable():
+    array = na.c_array_from_buffers(
+        na.struct({"some_col": na.int32()}),
+        length=3,
+        buffers=[],
+        children=[na.c_array([1, 2, 3], na.int32())],
+    )
+
+    out = io.BytesIO()
+    with StreamWriter.from_writable(out) as writer:
+        writer.write_array(array)
+
+    with na.ArrayStream.from_readable(out.getvalue()) as stream:
+        assert stream.read_all().to_pylist() == na.Array(array).to_pylist()
+
+
+def test_writer_from_path():
+    array = na.c_array_from_buffers(
+        na.struct({"some_col": na.int32()}),
+        length=3,
+        buffers=[],
+        children=[na.c_array([1, 2, 3], na.int32())],
+    )
+
+    with tempfile.TemporaryDirectory() as td:
+        path = os.path.join(td, "test.arrows")
+
+        with StreamWriter.from_path(path) as writer:
+            writer.write_array(array)
+
+        with na.ArrayStream.from_path(path) as stream:
+            assert stream.read_all().to_pylist() == na.Array(array).to_pylist()
+
+
+def test_writer_write_stream_schema():
+    array = na.c_array_from_buffers(
+        na.struct({"some_col": na.int32()}),
+        length=3,
+        buffers=[],
+        children=[na.c_array([1, 2, 3], na.int32())],
+    )
+    zero_chunk_array = na.Array.from_chunks([], array.schema)
+
+    out = io.BytesIO()
+    with StreamWriter.from_writable(out) as writer:
+        writer.write_stream(zero_chunk_array)
+        schema_bytes = out.getvalue()
+
+    with StreamWriter.from_writable(out) as writer:
+        out.truncate(0)
+        out.seek(0)
+        writer.write_stream(zero_chunk_array)
+        writer.write_stream(zero_chunk_array, write_schema=True)
+        two_schema_bytes = out.getvalue()
+
+    assert (schema_bytes + schema_bytes) == two_schema_bytes
+
+    with StreamWriter.from_writable(out) as writer:
+        out.truncate(0)
+        out.seek(0)
+        writer.write_array(array, write_schema=False)
+        array_bytes = out.getvalue()
+
+    with StreamWriter.from_writable(out) as writer:
+        out.truncate(0)
+        out.seek(0)
+        writer.write_array(array, write_schema=True)
+        both_bytes = out.getvalue()
+
+    assert (schema_bytes + array_bytes) == both_bytes
+
+
+def test_writer_serialize_stream():
+    array = na.c_array_from_buffers(
+        na.struct({"some_col": na.int32()}),
+        length=3,
+        buffers=[],
+        children=[na.c_array([1, 2, 3], na.int32())],
+    )
+
+    out = io.BytesIO()
+    with StreamWriter.from_writable(out) as writer:
+        writer.write_array(array)
+
+        # Check that we can't serialize after we've already written to stream
+        with pytest.raises(ValueError, match="Can't serialize_stream"):
+            writer.serialize_stream(array)
+
+        schema_and_array_bytes = out.getvalue()
+
+    end_of_stream_bytes = b"\xff\xff\xff\xff\x00\x00\x00\x00"
+
+    writer = StreamWriter.from_writable(out)
+    out.truncate(0)
+    out.seek(0)
+    writer.serialize_stream(array)
+    assert writer._is_valid() is False
+
+    serialized_bytes = out.getvalue()
+    assert (schema_and_array_bytes + end_of_stream_bytes) == serialized_bytes
+
+
+def test_writer_python_exception_on_write():
+    class ExtraordinarilyInconvenientFile:
+        def write(self, *args, **kwargs):
+            raise RuntimeError("I error for all write requests")
+
+    with pytest.raises(NanoarrowException, match="I error for all write 
requests"):
+        with StreamWriter.from_writable(ExtraordinarilyInconvenientFile()) as 
writer:
+            writer.write(na.c_array([], na.struct([na.int32()])))
+
+
+def test_writer_error_on_write():
+    with pytest.raises(NanoarrowException):
+        with StreamWriter.from_writable(io.BytesIO()) as writer:
+            writer.write_stream(na.c_array([], na.int32()))
diff --git a/python/tests/test_schema.py b/python/tests/test_schema.py
index 86360369..abfe6518 100644
--- a/python/tests/test_schema.py
+++ b/python/tests/test_schema.py
@@ -244,3 +244,18 @@ def test_schema_extension():
 def test_schema_alias_constructor():
     schema = na.schema(na.Type.INT32)
     assert isinstance(schema, na.Schema)
+
+
+def test_schema_serialize():
+    import io
+
+    schema = na.struct({"some_col": na.int32()}, nullable=False)
+
+    serialized = schema.serialize()
+    schema_roundtrip = na.ArrayStream.from_readable(serialized).schema
+    assert repr(schema_roundtrip) == repr(schema)
+
+    out = io.BytesIO()
+    schema.serialize(out)
+    schema_roundtrip = na.ArrayStream.from_readable(out.getvalue()).schema
+    assert repr(schema_roundtrip) == repr(schema)


Reply via email to