This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 03186b0 ARROW-5910: [Python] Support non-seekable streams in
ipc.read_tensor, ipc.read_message, add Message.serialize_to method
03186b0 is described below
commit 03186b06cea13101480599414cd725b20891c9dd
Author: Wes McKinney <[email protected]>
AuthorDate: Sat Aug 24 15:13:21 2019 -0500
ARROW-5910: [Python] Support non-seekable streams in ipc.read_tensor,
ipc.read_message, add Message.serialize_to method
These functions didn't accept wrapped InputStream objects (such as
compressed input stream), so this addresses that.
Closes #5182 from wesm/ARROW-5910 and squashes the following commits:
c647890cf <Wes McKinney> Add JIRA ID comment
1120a11b2 <Wes McKinney> Support non-seekable streams in ipc.read_tensor,
ipc.read_message, add Message.serialize_to method
Authored-by: Wes McKinney <[email protected]>
Signed-off-by: Wes McKinney <[email protected]>
---
python/pyarrow/ipc.pxi | 76 +++++++++++++++++++++++--------------
python/pyarrow/tests/test_ipc.py | 16 ++++++++
python/pyarrow/tests/test_tensor.py | 15 ++++++++
3 files changed, 78 insertions(+), 29 deletions(-)
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 834b6bd..98cf61e 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -60,31 +60,45 @@ cdef class Message:
result = self.message.get().Equals(deref(other.message.get()))
return result
- def serialize(self, alignment=8, memory_pool=None):
+ def serialize_to(self, NativeFile sink, alignment=8, memory_pool=None):
"""
- Write message as encapsulated IPC message
+ Write message to generic OutputStream
Parameters
----------
+ sink : NativeFile
alignment : int, default 8
Byte alignment for metadata and body
memory_pool : MemoryPool, default None
Uses default memory pool if not specified
-
- Returns
- -------
- serialized : Buffer
"""
cdef:
- BufferOutputStream stream = BufferOutputStream(memory_pool)
int64_t output_length = 0
int32_t c_alignment = alignment
+ OutputStream* out
- handle = stream.get_output_stream()
+ out = sink.get_output_stream().get()
with nogil:
check_status(self.message.get()
- .SerializeTo(handle.get(), c_alignment,
- &output_length))
+ .SerializeTo(out, c_alignment, &output_length))
+
+ def serialize(self, alignment=8, memory_pool=None):
+ """
+ Write message as encapsulated IPC message
+
+ Parameters
+ ----------
+ alignment : int, default 8
+ Byte alignment for metadata and body
+ memory_pool : MemoryPool, default None
+ Uses default memory pool if not specified
+
+ Returns
+ -------
+ serialized : Buffer
+ """
+ stream = BufferOutputStream(memory_pool)
+ self.serialize_to(stream, alignment=alignment, memory_pool=memory_pool)
return stream.getvalue()
def __repr__(self):
@@ -448,7 +462,20 @@ def write_tensor(Tensor tensor, NativeFile dest):
return metadata_length + body_length
-def read_tensor(NativeFile source):
+cdef NativeFile as_native_file(source):
+ if not isinstance(source, NativeFile):
+ if hasattr(source, 'read'):
+ source = PythonFile(source)
+ else:
+ source = BufferReader(source)
+
+ if not isinstance(source, NativeFile):
+ raise ValueError('Unable to read message from object with type: {0}'
+ .format(type(source)))
+ return source
+
+
+def read_tensor(source):
"""Read pyarrow.Tensor from pyarrow.NativeFile object from current
position. If the file source supports zero copy (e.g. a memory map), then
this operation does not allocate any memory. This function not assume that
@@ -465,11 +492,13 @@ def read_tensor(NativeFile source):
"""
cdef:
shared_ptr[CTensor] sp_tensor
- RandomAccessFile* rd_file
+ InputStream* c_stream
- rd_file = source.get_random_access_file().get()
+ cdef NativeFile nf = as_native_file(source)
+
+ c_stream = nf.get_input_stream().get()
with nogil:
- check_status(ReadTensor(rd_file, &sp_tensor))
+ check_status(ReadTensor(c_stream, &sp_tensor))
return pyarrow_wrap_tensor(sp_tensor)
@@ -487,24 +516,13 @@ def read_message(source):
"""
cdef:
Message result = Message.__new__(Message)
- NativeFile cpp_file
- RandomAccessFile* rd_file
-
- if not isinstance(source, NativeFile):
- if hasattr(source, 'read'):
- source = PythonFile(source)
- else:
- source = BufferReader(source)
-
- if not isinstance(source, NativeFile):
- raise ValueError('Unable to read message from object with type: {0}'
- .format(type(source)))
+ InputStream* c_stream
- cpp_file = source
- rd_file = cpp_file.get_random_access_file().get()
+ cdef NativeFile nf = as_native_file(source)
+ c_stream = nf.get_input_stream().get()
with nogil:
- check_status(ReadMessage(rd_file, &result.message))
+ check_status(ReadMessage(c_stream, &result.message))
return result
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 92b1613..fb62e7d 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -373,6 +373,22 @@ def test_message_serialize_read_message(example_messages):
assert msg.equals(restored3)
+def test_message_read_from_compressed(example_messages):
+ # Part of ARROW-5910
+ _, messages = example_messages
+ for message in messages:
+ raw_out = pa.BufferOutputStream()
+ compressed_out = pa.output_stream(raw_out, compression='gzip')
+ message.serialize_to(compressed_out)
+ compressed_out.close()
+
+ compressed_buf = raw_out.getvalue()
+
+ result = pa.read_message(pa.input_stream(compressed_buf,
+ compression='gzip'))
+ assert result.equals(message)
+
+
def test_message_read_record_batch(example_messages):
batches, messages = example_messages
diff --git a/python/pyarrow/tests/test_tensor.py
b/python/pyarrow/tests/test_tensor.py
index 13f05d2..8862ed8 100644
--- a/python/pyarrow/tests/test_tensor.py
+++ b/python/pyarrow/tests/test_tensor.py
@@ -102,6 +102,21 @@ def test_tensor_ipc_roundtrip(tmpdir):
assert result.equals(tensor)
+def test_tensor_ipc_read_from_compressed(tempdir):
+ # ARROW-5910
+ data = np.random.randn(10, 4)
+ tensor = pa.Tensor.from_numpy(data)
+
+ path = tempdir / 'tensor-compressed-file'
+
+ out_stream = pa.output_stream(path, compression='gzip')
+ pa.write_tensor(tensor, out_stream)
+ out_stream.close()
+
+ result = pa.read_tensor(pa.input_stream(path, compression='gzip'))
+ assert result.equals(tensor)
+
+
def test_tensor_ipc_strided(tmpdir):
data1 = np.random.randn(10, 4)
tensor1 = pa.Tensor.from_numpy(data1[::2])