This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5719576c61 ARROW-16356: [Python] Expose RandomAccessFile::GetStream 
(#13793)
5719576c61 is described below

commit 5719576c611929dd790f7f8a1ae3169a8f96f7f1
Author: Miles Granger <[email protected]>
AuthorDate: Mon Aug 8 12:43:39 2022 +0200

    ARROW-16356: [Python] Expose RandomAccessFile::GetStream (#13793)
    
    This will fix 
[ARROW-16356](https://issues.apache.org/jira/browse/ARROW-16356), taking over 
and will close #13144
    
    Adds `pyarrow.NativeFile.get_stream`, ie:
    ```python
    with pa.PythonFile(io.BytesIO(b'data'), mode='r') as f:
        stream = f.get_stream(file_offset=1, nbytes=3)
        assert stream.read() == b'ata'
    ```
    
    Lead-authored-by: Miles Granger <[email protected]>
    Co-authored-by: alexdesiqueira <[email protected]>
    Co-authored-by: alexdesiqueira <[email protected]>
    Co-authored-by: Alexandre de Siqueira <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/io/interfaces.cc       |  8 ++++++-
 cpp/src/arrow/io/interfaces.h        |  4 ++--
 cpp/src/arrow/io/memory_test.cc      |  4 ++--
 cpp/src/parquet/properties.cc        |  5 ++--
 python/pyarrow/includes/libarrow.pxd |  6 +++++
 python/pyarrow/io.pxi                | 36 +++++++++++++++++++++++++++++
 python/pyarrow/tests/test_io.py      | 44 ++++++++++++++++++++++++++++++++++++
 7 files changed, 100 insertions(+), 7 deletions(-)

diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index 8dbc9bd5ed..1dfb0bdf8a 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -247,8 +247,14 @@ class FileSegmentReader
   int64_t nbytes_;
 };
 
-std::shared_ptr<InputStream> RandomAccessFile::GetStream(
+Result<std::shared_ptr<InputStream>> RandomAccessFile::GetStream(
     std::shared_ptr<RandomAccessFile> file, int64_t file_offset, int64_t 
nbytes) {
+  if (file_offset < 0) {
+    return Status::Invalid("file_offset should be a positive value, got: ", 
file_offset);
+  }
+  if (nbytes < 0) {
+    return Status::Invalid("nbytes should be a positive value, got: ", nbytes);
+  }
   return std::make_shared<FileSegmentReader>(std::move(file), file_offset, 
nbytes);
 }
 
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index 0baffc3c5d..70c0dd8520 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -262,8 +262,8 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, 
public Seekable {
   /// \param[in] file_offset the starting position in the file
   /// \param[in] nbytes the extent of bytes to read. The file should have
   /// sufficient bytes available
-  static std::shared_ptr<InputStream> 
GetStream(std::shared_ptr<RandomAccessFile> file,
-                                                int64_t file_offset, int64_t 
nbytes);
+  static Result<std::shared_ptr<InputStream>> GetStream(
+      std::shared_ptr<RandomAccessFile> file, int64_t file_offset, int64_t 
nbytes);
 
   /// \brief Return the total file size in bytes.
   ///
diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc
index 50335cb6ac..d361243ad6 100644
--- a/cpp/src/arrow/io/memory_test.cc
+++ b/cpp/src/arrow/io/memory_test.cc
@@ -298,8 +298,8 @@ TEST(TestRandomAccessFile, GetStream) {
 
   std::shared_ptr<InputStream> stream1, stream2;
 
-  stream1 = RandomAccessFile::GetStream(file, 0, 10);
-  stream2 = RandomAccessFile::GetStream(file, 9, 16);
+  ASSERT_OK_AND_ASSIGN(stream1, RandomAccessFile::GetStream(file, 0, 10));
+  ASSERT_OK_AND_ASSIGN(stream2, RandomAccessFile::GetStream(file, 9, 16));
 
   ASSERT_OK_AND_EQ(0, stream1->Tell());
 
diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc
index 93638dbe28..b8e529896b 100644
--- a/cpp/src/parquet/properties.cc
+++ b/cpp/src/parquet/properties.cc
@@ -31,8 +31,9 @@ std::shared_ptr<ArrowInputStream> ReaderProperties::GetStream(
   if (buffered_stream_enabled_) {
     // ARROW-6180 / PARQUET-1636 Create isolated reader that references segment
     // of source
-    std::shared_ptr<::arrow::io::InputStream> safe_stream =
-        ::arrow::io::RandomAccessFile::GetStream(source, start, num_bytes);
+    PARQUET_ASSIGN_OR_THROW(
+        std::shared_ptr<::arrow::io::InputStream> safe_stream,
+        ::arrow::io::RandomAccessFile::GetStream(source, start, num_bytes));
     PARQUET_ASSIGN_OR_THROW(
         auto stream, ::arrow::io::BufferedInputStream::Create(buffer_size_, 
pool_,
                                                               safe_stream, 
num_bytes));
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 4cbcef84e8..a9b0a4bc71 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1280,6 +1280,12 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" 
nogil:
                                                                   Seekable):
         CResult[int64_t] GetSize()
 
+        @staticmethod
+        CResult[shared_ptr[CInputStream]] GetStream(
+            shared_ptr[CRandomAccessFile] file,
+            int64_t file_offset,
+            int64_t nbytes)
+
         CResult[int64_t] ReadAt(int64_t position, int64_t nbytes,
                                 uint8_t* buffer)
         CResult[shared_ptr[CBuffer]] ReadAt(int64_t position, int64_t nbytes)
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index d2e4f7062e..f126633696 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -399,6 +399,42 @@ cdef class NativeFile(_Weakrefable):
 
         return PyObject_to_object(obj)
 
+    def get_stream(self, file_offset, nbytes):
+        """
+        Return an input stream that reads a file segment independent of the
+        state of the file.
+
+        Allows reading portions of a random access file as an input stream
+        without interfering with each other.
+
+        Parameters
+        ----------
+        file_offset : int
+        nbytes : int
+
+        Returns
+        -------
+        stream : NativeFile
+        """
+        cdef:
+            shared_ptr[CInputStream] data
+            int64_t c_file_offset
+            int64_t c_nbytes
+
+        c_file_offset = file_offset
+        c_nbytes = nbytes
+
+        handle = self.get_random_access_file()
+
+        data = GetResultValue(
+            CRandomAccessFile.GetStream(handle, c_file_offset, c_nbytes))
+
+        stream = NativeFile()
+        stream.set_input_stream(data)
+        stream.is_readable = True
+
+        return stream
+
     def read_at(self, nbytes, offset):
         """
         Read indicated number of bytes at offset from the file
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index f04ae23ce8..ca49c5218e 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -126,6 +126,50 @@ def test_python_file_read():
         pa.PythonFile(StringIO(), mode='r')
 
 
[email protected]("nbytes", (-1, 0, 1, 5, 100))
[email protected]("file_offset", (-1, 0, 5, 100))
+def test_python_file_get_stream(nbytes, file_offset):
+
+    data = b'data1data2data3data4data5'
+
+    f = pa.PythonFile(BytesIO(data), mode='r')
+
+    # negative nbytes or offsets don't make sense here, raise ValueError
+    if nbytes < 0 or file_offset < 0:
+        with pytest.raises(pa.ArrowInvalid,
+                           match="should be a positive value"):
+            f.get_stream(file_offset=file_offset, nbytes=nbytes)
+        f.close()
+        return
+    else:
+        stream = f.get_stream(file_offset=file_offset, nbytes=nbytes)
+
+    # Subsequent calls to 'read' should match behavior if same
+    # data passed to BytesIO where get_stream should handle if
+    # nbytes/file_offset results in no bytes b/c out of bounds.
+    start = min(file_offset, len(data))
+    end = min(file_offset + nbytes, len(data))
+    buf = BytesIO(data[start:end])
+
+    # read some chunks
+    assert stream.read(nbytes=4) == buf.read(4)
+    assert stream.read(nbytes=6) == buf.read(6)
+
+    # Read to end of each stream
+    assert stream.read() == buf.read()
+
+    # Try reading past the stream
+    n = len(data) * 2
+    assert stream.read(n) == buf.read(n)
+
+    # NativeFile[CInputStream] is not seekable
+    with pytest.raises(OSError, match="seekable"):
+        stream.seek(0)
+
+    stream.close()
+    assert stream.closed
+
+
 def test_python_file_read_at():
     data = b'some sample data'
 

Reply via email to