This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 03186b0  ARROW-5910: [Python] Support non-seekable streams in 
ipc.read_tensor, ipc.read_message, add Message.serialize_to method
03186b0 is described below

commit 03186b06cea13101480599414cd725b20891c9dd
Author: Wes McKinney <[email protected]>
AuthorDate: Sat Aug 24 15:13:21 2019 -0500

    ARROW-5910: [Python] Support non-seekable streams in ipc.read_tensor, 
ipc.read_message, add Message.serialize_to method
    
    These functions didn't accept wrapped InputStream objects (such as 
compressed input stream), so this addresses that.
    
    Closes #5182 from wesm/ARROW-5910 and squashes the following commits:
    
    c647890cf <Wes McKinney> Add JIRA ID comment
    1120a11b2 <Wes McKinney> Support non-seekable streams in ipc.read_tensor, 
ipc.read_message, add Message.serialize_to method
    
    Authored-by: Wes McKinney <[email protected]>
    Signed-off-by: Wes McKinney <[email protected]>
---
 python/pyarrow/ipc.pxi              | 76 +++++++++++++++++++++++--------------
 python/pyarrow/tests/test_ipc.py    | 16 ++++++++
 python/pyarrow/tests/test_tensor.py | 15 ++++++++
 3 files changed, 78 insertions(+), 29 deletions(-)

diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 834b6bd..98cf61e 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -60,31 +60,45 @@ cdef class Message:
             result = self.message.get().Equals(deref(other.message.get()))
         return result
 
-    def serialize(self, alignment=8, memory_pool=None):
+    def serialize_to(self, NativeFile sink, alignment=8, memory_pool=None):
         """
-        Write message as encapsulated IPC message
+        Write message to generic OutputStream
 
         Parameters
         ----------
+        sink : NativeFile
         alignment : int, default 8
             Byte alignment for metadata and body
         memory_pool : MemoryPool, default None
             Uses default memory pool if not specified
-
-        Returns
-        -------
-        serialized : Buffer
         """
         cdef:
-            BufferOutputStream stream = BufferOutputStream(memory_pool)
             int64_t output_length = 0
             int32_t c_alignment = alignment
+            OutputStream* out
 
-        handle = stream.get_output_stream()
+        out = sink.get_output_stream().get()
         with nogil:
             check_status(self.message.get()
-                         .SerializeTo(handle.get(), c_alignment,
-                                      &output_length))
+                         .SerializeTo(out, c_alignment, &output_length))
+
+    def serialize(self, alignment=8, memory_pool=None):
+        """
+        Write message as encapsulated IPC message
+
+        Parameters
+        ----------
+        alignment : int, default 8
+            Byte alignment for metadata and body
+        memory_pool : MemoryPool, default None
+            Uses default memory pool if not specified
+
+        Returns
+        -------
+        serialized : Buffer
+        """
+        stream = BufferOutputStream(memory_pool)
+        self.serialize_to(stream, alignment=alignment, memory_pool=memory_pool)
         return stream.getvalue()
 
     def __repr__(self):
@@ -448,7 +462,20 @@ def write_tensor(Tensor tensor, NativeFile dest):
     return metadata_length + body_length
 
 
-def read_tensor(NativeFile source):
+cdef NativeFile as_native_file(source):
+    if not isinstance(source, NativeFile):
+        if hasattr(source, 'read'):
+            source = PythonFile(source)
+        else:
+            source = BufferReader(source)
+
+    if not isinstance(source, NativeFile):
+        raise ValueError('Unable to read message from object with type: {0}'
+                         .format(type(source)))
+    return source
+
+
+def read_tensor(source):
     """Read pyarrow.Tensor from pyarrow.NativeFile object from current
     position. If the file source supports zero copy (e.g. a memory map), then
     this operation does not allocate any memory. This function not assume that
@@ -465,11 +492,13 @@ def read_tensor(NativeFile source):
     """
     cdef:
         shared_ptr[CTensor] sp_tensor
-        RandomAccessFile* rd_file
+        InputStream* c_stream
 
-    rd_file = source.get_random_access_file().get()
+    cdef NativeFile nf = as_native_file(source)
+
+    c_stream = nf.get_input_stream().get()
     with nogil:
-        check_status(ReadTensor(rd_file, &sp_tensor))
+        check_status(ReadTensor(c_stream, &sp_tensor))
     return pyarrow_wrap_tensor(sp_tensor)
 
 
@@ -487,24 +516,13 @@ def read_message(source):
     """
     cdef:
         Message result = Message.__new__(Message)
-        NativeFile cpp_file
-        RandomAccessFile* rd_file
-
-    if not isinstance(source, NativeFile):
-        if hasattr(source, 'read'):
-            source = PythonFile(source)
-        else:
-            source = BufferReader(source)
-
-    if not isinstance(source, NativeFile):
-        raise ValueError('Unable to read message from object with type: {0}'
-                         .format(type(source)))
+        InputStream* c_stream
 
-    cpp_file = source
-    rd_file = cpp_file.get_random_access_file().get()
+    cdef NativeFile nf = as_native_file(source)
+    c_stream = nf.get_input_stream().get()
 
     with nogil:
-        check_status(ReadMessage(rd_file, &result.message))
+        check_status(ReadMessage(c_stream, &result.message))
 
     return result
 
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 92b1613..fb62e7d 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -373,6 +373,22 @@ def test_message_serialize_read_message(example_messages):
     assert msg.equals(restored3)
 
 
+def test_message_read_from_compressed(example_messages):
+    # Part of ARROW-5910
+    _, messages = example_messages
+    for message in messages:
+        raw_out = pa.BufferOutputStream()
+        compressed_out = pa.output_stream(raw_out, compression='gzip')
+        message.serialize_to(compressed_out)
+        compressed_out.close()
+
+        compressed_buf = raw_out.getvalue()
+
+        result = pa.read_message(pa.input_stream(compressed_buf,
+                                                 compression='gzip'))
+        assert result.equals(message)
+
+
 def test_message_read_record_batch(example_messages):
     batches, messages = example_messages
 
diff --git a/python/pyarrow/tests/test_tensor.py 
b/python/pyarrow/tests/test_tensor.py
index 13f05d2..8862ed8 100644
--- a/python/pyarrow/tests/test_tensor.py
+++ b/python/pyarrow/tests/test_tensor.py
@@ -102,6 +102,21 @@ def test_tensor_ipc_roundtrip(tmpdir):
     assert result.equals(tensor)
 
 
+def test_tensor_ipc_read_from_compressed(tempdir):
+    # ARROW-5910
+    data = np.random.randn(10, 4)
+    tensor = pa.Tensor.from_numpy(data)
+
+    path = tempdir / 'tensor-compressed-file'
+
+    out_stream = pa.output_stream(path, compression='gzip')
+    pa.write_tensor(tensor, out_stream)
+    out_stream.close()
+
+    result = pa.read_tensor(pa.input_stream(path, compression='gzip'))
+    assert result.equals(tensor)
+
+
 def test_tensor_ipc_strided(tmpdir):
     data1 = np.random.randn(10, 4)
     tensor1 = pa.Tensor.from_numpy(data1[::2])

Reply via email to