This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 5719576c61 ARROW-16356: [Python] Expose RandomAccessFile::GetStream
(#13793)
5719576c61 is described below
commit 5719576c611929dd790f7f8a1ae3169a8f96f7f1
Author: Miles Granger <[email protected]>
AuthorDate: Mon Aug 8 12:43:39 2022 +0200
ARROW-16356: [Python] Expose RandomAccessFile::GetStream (#13793)
This will fix
[ARROW-16356](https://issues.apache.org/jira/browse/ARROW-16356), taking over
and will close #13144
Adds `pyarrow.NativeFile.get_stream`, ie:
```python
with pa.PythonFile(io.BytesIO(b'data'), mode='r') as f:
stream = f.get_stream(file_offset=1, nbytes=3)
assert stream.read() == b'ata'
```
Lead-authored-by: Miles Granger <[email protected]>
Co-authored-by: alexdesiqueira <[email protected]>
Co-authored-by: alexdesiqueira <[email protected]>
Co-authored-by: Alexandre de Siqueira <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/io/interfaces.cc | 8 ++++++-
cpp/src/arrow/io/interfaces.h | 4 ++--
cpp/src/arrow/io/memory_test.cc | 4 ++--
cpp/src/parquet/properties.cc | 5 ++--
python/pyarrow/includes/libarrow.pxd | 6 +++++
python/pyarrow/io.pxi | 36 +++++++++++++++++++++++++++++
python/pyarrow/tests/test_io.py | 44 ++++++++++++++++++++++++++++++++++++
7 files changed, 100 insertions(+), 7 deletions(-)
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index 8dbc9bd5ed..1dfb0bdf8a 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -247,8 +247,14 @@ class FileSegmentReader
int64_t nbytes_;
};
-std::shared_ptr<InputStream> RandomAccessFile::GetStream(
+Result<std::shared_ptr<InputStream>> RandomAccessFile::GetStream(
std::shared_ptr<RandomAccessFile> file, int64_t file_offset, int64_t
nbytes) {
+ if (file_offset < 0) {
+ return Status::Invalid("file_offset should be a positive value, got: ",
file_offset);
+ }
+ if (nbytes < 0) {
+ return Status::Invalid("nbytes should be a positive value, got: ", nbytes);
+ }
return std::make_shared<FileSegmentReader>(std::move(file), file_offset,
nbytes);
}
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index 0baffc3c5d..70c0dd8520 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -262,8 +262,8 @@ class ARROW_EXPORT RandomAccessFile : public InputStream,
public Seekable {
/// \param[in] file_offset the starting position in the file
/// \param[in] nbytes the extent of bytes to read. The file should have
/// sufficient bytes available
- static std::shared_ptr<InputStream>
GetStream(std::shared_ptr<RandomAccessFile> file,
- int64_t file_offset, int64_t
nbytes);
+ static Result<std::shared_ptr<InputStream>> GetStream(
+ std::shared_ptr<RandomAccessFile> file, int64_t file_offset, int64_t
nbytes);
/// \brief Return the total file size in bytes.
///
diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc
index 50335cb6ac..d361243ad6 100644
--- a/cpp/src/arrow/io/memory_test.cc
+++ b/cpp/src/arrow/io/memory_test.cc
@@ -298,8 +298,8 @@ TEST(TestRandomAccessFile, GetStream) {
std::shared_ptr<InputStream> stream1, stream2;
- stream1 = RandomAccessFile::GetStream(file, 0, 10);
- stream2 = RandomAccessFile::GetStream(file, 9, 16);
+ ASSERT_OK_AND_ASSIGN(stream1, RandomAccessFile::GetStream(file, 0, 10));
+ ASSERT_OK_AND_ASSIGN(stream2, RandomAccessFile::GetStream(file, 9, 16));
ASSERT_OK_AND_EQ(0, stream1->Tell());
diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc
index 93638dbe28..b8e529896b 100644
--- a/cpp/src/parquet/properties.cc
+++ b/cpp/src/parquet/properties.cc
@@ -31,8 +31,9 @@ std::shared_ptr<ArrowInputStream> ReaderProperties::GetStream(
if (buffered_stream_enabled_) {
// ARROW-6180 / PARQUET-1636 Create isolated reader that references segment
// of source
- std::shared_ptr<::arrow::io::InputStream> safe_stream =
- ::arrow::io::RandomAccessFile::GetStream(source, start, num_bytes);
+ PARQUET_ASSIGN_OR_THROW(
+ std::shared_ptr<::arrow::io::InputStream> safe_stream,
+ ::arrow::io::RandomAccessFile::GetStream(source, start, num_bytes));
PARQUET_ASSIGN_OR_THROW(
auto stream, ::arrow::io::BufferedInputStream::Create(buffer_size_,
pool_,
safe_stream,
num_bytes));
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index 4cbcef84e8..a9b0a4bc71 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1280,6 +1280,12 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io"
nogil:
Seekable):
CResult[int64_t] GetSize()
+ @staticmethod
+ CResult[shared_ptr[CInputStream]] GetStream(
+ shared_ptr[CRandomAccessFile] file,
+ int64_t file_offset,
+ int64_t nbytes)
+
CResult[int64_t] ReadAt(int64_t position, int64_t nbytes,
uint8_t* buffer)
CResult[shared_ptr[CBuffer]] ReadAt(int64_t position, int64_t nbytes)
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index d2e4f7062e..f126633696 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -399,6 +399,42 @@ cdef class NativeFile(_Weakrefable):
return PyObject_to_object(obj)
+ def get_stream(self, file_offset, nbytes):
+ """
+ Return an input stream that reads a file segment independent of the
+ state of the file.
+
+ Allows reading portions of a random access file as an input stream
+ without interfering with each other.
+
+ Parameters
+ ----------
+ file_offset : int
+ nbytes : int
+
+ Returns
+ -------
+ stream : NativeFile
+ """
+ cdef:
+ shared_ptr[CInputStream] data
+ int64_t c_file_offset
+ int64_t c_nbytes
+
+ c_file_offset = file_offset
+ c_nbytes = nbytes
+
+ handle = self.get_random_access_file()
+
+ data = GetResultValue(
+ CRandomAccessFile.GetStream(handle, c_file_offset, c_nbytes))
+
+ stream = NativeFile()
+ stream.set_input_stream(data)
+ stream.is_readable = True
+
+ return stream
+
def read_at(self, nbytes, offset):
"""
Read indicated number of bytes at offset from the file
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index f04ae23ce8..ca49c5218e 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -126,6 +126,50 @@ def test_python_file_read():
pa.PythonFile(StringIO(), mode='r')
[email protected]("nbytes", (-1, 0, 1, 5, 100))
[email protected]("file_offset", (-1, 0, 5, 100))
+def test_python_file_get_stream(nbytes, file_offset):
+
+ data = b'data1data2data3data4data5'
+
+ f = pa.PythonFile(BytesIO(data), mode='r')
+
+ # negative nbytes or offsets don't make sense here, raise ValueError
+ if nbytes < 0 or file_offset < 0:
+ with pytest.raises(pa.ArrowInvalid,
+ match="should be a positive value"):
+ f.get_stream(file_offset=file_offset, nbytes=nbytes)
+ f.close()
+ return
+ else:
+ stream = f.get_stream(file_offset=file_offset, nbytes=nbytes)
+
+ # Subsequent calls to 'read' should match behavior if same
+ # data passed to BytesIO where get_stream should handle if
+ # nbytes/file_offset results in no bytes b/c out of bounds.
+ start = min(file_offset, len(data))
+ end = min(file_offset + nbytes, len(data))
+ buf = BytesIO(data[start:end])
+
+ # read some chunks
+ assert stream.read(nbytes=4) == buf.read(4)
+ assert stream.read(nbytes=6) == buf.read(6)
+
+ # Read to end of each stream
+ assert stream.read() == buf.read()
+
+ # Try reading past the stream
+ n = len(data) * 2
+ assert stream.read(n) == buf.read(n)
+
+ # NativeFile[CInputStream] is not seekable
+ with pytest.raises(OSError, match="seekable"):
+ stream.seek(0)
+
+ stream.close()
+ assert stream.closed
+
+
def test_python_file_read_at():
data = b'some sample data'