This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3236c129d1 GH-36441: [Python] Make `CacheOptions` configurable from 
Python  (#36627)
3236c129d1 is described below

commit 3236c129d1cbe3f73359278d1459a3f20e5c4df0
Author: Thomas Newton <[email protected]>
AuthorDate: Thu Dec 14 14:12:17 2023 +0000

    GH-36441: [Python] Make `CacheOptions` configurable from Python  (#36627)
    
    
    
    ### Rationale for this change
    Resolves: https://github.com/apache/arrow/issues/36441
    
    ### What changes are included in this PR?
    - Add python bindings for `CacheOptions` from the C++ side.
    - Allow setting `cache_options` on `ParquetFragmentScanOptions` from the 
python side.
    - Adjust some of the comments on `CacheOptions`
    
    ### Are these changes tested?
    Yes. I added python side tests for these newly available configs similar to 
other configs. I have not added an integration test that ensures setting the 
configs on the python side leads to correctly using them on the C++ side.
    
    ### Are there any user-facing changes?
    Yes. The are new configs available on the python side but the defaults are 
unchanged. I've added/updated docstrings where relevant.
    
    * Closes: #36441
    
    Lead-authored-by: Thomas Newton <[email protected]>
    Co-authored-by: Joris Van den Bossche <[email protected]>
    Signed-off-by: Joris Van den Bossche <[email protected]>
---
 cpp/src/arrow/io/caching.h           |  10 ++-
 python/pyarrow/__init__.py           |   2 +-
 python/pyarrow/_dataset_parquet.pyx  |  21 +++++-
 python/pyarrow/_parquet.pxd          |   6 +-
 python/pyarrow/includes/libarrow.pxd |  16 +++++
 python/pyarrow/io.pxi                | 134 +++++++++++++++++++++++++++++++++++
 python/pyarrow/lib.pxd               |  12 ++++
 python/pyarrow/tests/test_dataset.py |  28 +++++---
 python/pyarrow/tests/test_io.py      |  59 +++++++++++++++
 9 files changed, 271 insertions(+), 17 deletions(-)

diff --git a/cpp/src/arrow/io/caching.h b/cpp/src/arrow/io/caching.h
index 9c1b8fe88b..e2b911fafd 100644
--- a/cpp/src/arrow/io/caching.h
+++ b/cpp/src/arrow/io/caching.h
@@ -42,6 +42,11 @@ struct ARROW_EXPORT CacheOptions {
   ///   size greater than this, they are not combined
   int64_t range_size_limit;
   /// \brief A lazy cache does not perform any I/O until requested.
+  ///   lazy = false: request all byte ranges when PreBuffer or WillNeed is 
called.
+  ///   lazy = True, prefetch_limit = 0: request merged byte ranges only after 
the reader
+  ///   needs them.
+  ///   lazy = True, prefetch_limit = k: prefetch up to k merged byte ranges 
ahead of the
+  ///   range that is currently being read.
   bool lazy;
   /// \brief The maximum number of ranges to be prefetched. This is only used
   ///   for lazy cache to asynchronously read some ranges after reading the 
target range.
@@ -56,9 +61,10 @@ struct ARROW_EXPORT CacheOptions {
   /// \brief Construct CacheOptions from network storage metrics (e.g. S3).
   ///
   /// \param[in] time_to_first_byte_millis Seek-time or Time-To-First-Byte 
(TTFB) in
-  ///   milliseconds, also called call setup latency of a new S3 request.
+  ///   milliseconds, also called call setup latency of a new read request.
   ///   The value is a positive integer.
-  /// \param[in] transfer_bandwidth_mib_per_sec Data transfer Bandwidth (BW) 
in MiB/sec.
+  /// \param[in] transfer_bandwidth_mib_per_sec Data transfer Bandwidth (BW) 
in MiB/sec
+  ///   (per connection).
   ///   The value is a positive integer.
   /// \param[in] ideal_bandwidth_utilization_frac Transfer bandwidth 
utilization fraction
   ///   (per connection) to maximize the net data load.
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index cd66abcb44..9da94885ec 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -243,7 +243,7 @@ from pyarrow.lib import (MemoryPool, LoggingMemoryPool, 
ProxyMemoryPool,
 
 # I/O
 from pyarrow.lib import (NativeFile, PythonFile,
-                         BufferedInputStream, BufferedOutputStream,
+                         BufferedInputStream, BufferedOutputStream, 
CacheOptions,
                          CompressedInputStream, CompressedOutputStream,
                          TransformInputStream, transcoding_input_stream,
                          FixedSizeBufferWriter,
diff --git a/python/pyarrow/_dataset_parquet.pyx 
b/python/pyarrow/_dataset_parquet.pyx
index d458ac4ee7..61e051f56c 100644
--- a/python/pyarrow/_dataset_parquet.pyx
+++ b/python/pyarrow/_dataset_parquet.pyx
@@ -42,6 +42,7 @@ from pyarrow._dataset cimport (
     FileWriteOptions,
     Fragment,
     FragmentScanOptions,
+    CacheOptions,
     Partitioning,
     PartitioningFactory,
     WrittenFile
@@ -693,6 +694,10 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
         parallel using a background I/O thread pool.
         Set to False if you want to prioritize minimal memory usage
         over maximum speed.
+    cache_options : pyarrow.CacheOptions, default None
+        Cache options used when pre_buffer is enabled. The default values 
should
+        be good for most use cases. You may want to adjust these for example if
+        you have exceptionally high latency to the file system. 
     thrift_string_size_limit : int, default None
         If not None, override the maximum total string size allocated
         when decoding Thrift structures. The default limit should be
@@ -714,6 +719,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
     def __init__(self, *, bint use_buffered_stream=False,
                  buffer_size=8192,
                  bint pre_buffer=True,
+                 cache_options=None,
                  thrift_string_size_limit=None,
                  thrift_container_size_limit=None,
                  decryption_config=None,
@@ -723,6 +729,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
         self.use_buffered_stream = use_buffered_stream
         self.buffer_size = buffer_size
         self.pre_buffer = pre_buffer
+        if cache_options is not None:
+            self.cache_options = cache_options
         if thrift_string_size_limit is not None:
             self.thrift_string_size_limit = thrift_string_size_limit
         if thrift_container_size_limit is not None:
@@ -770,6 +778,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
     def pre_buffer(self, bint pre_buffer):
         self.arrow_reader_properties().set_pre_buffer(pre_buffer)
 
+    @property
+    def cache_options(self):
+        return 
CacheOptions.wrap(self.arrow_reader_properties().cache_options())
+
+    @cache_options.setter
+    def cache_options(self, CacheOptions options):
+        self.arrow_reader_properties().set_cache_options(options.unwrap())
+
     @property
     def thrift_string_size_limit(self):
         return self.reader_properties().thrift_string_size_limit()
@@ -828,11 +844,11 @@ cdef class 
ParquetFragmentScanOptions(FragmentScanOptions):
         bool
         """
         attrs = (
-            self.use_buffered_stream, self.buffer_size, self.pre_buffer,
+            self.use_buffered_stream, self.buffer_size, self.pre_buffer, 
self.cache_options,
             self.thrift_string_size_limit, self.thrift_container_size_limit,
             self.page_checksum_verification)
         other_attrs = (
-            other.use_buffered_stream, other.buffer_size, other.pre_buffer,
+            other.use_buffered_stream, other.buffer_size, other.pre_buffer, 
other.cache_options,
             other.thrift_string_size_limit,
             other.thrift_container_size_limit, 
other.page_checksum_verification)
         return attrs == other_attrs
@@ -849,6 +865,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
             use_buffered_stream=self.use_buffered_stream,
             buffer_size=self.buffer_size,
             pre_buffer=self.pre_buffer,
+            cache_options=self.cache_options,
             thrift_string_size_limit=self.thrift_string_size_limit,
             thrift_container_size_limit=self.thrift_container_size_limit,
             page_checksum_verification=self.page_checksum_verification
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 59b50ceda8..7ce747e0aa 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -21,8 +21,8 @@
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport (CChunkedArray, CScalar, CSchema, 
CStatus,
                                         CTable, CMemoryPool, CBuffer,
-                                        CKeyValueMetadata,
-                                        CRandomAccessFile, COutputStream,
+                                        CKeyValueMetadata, CRandomAccessFile,
+                                        COutputStream, CCacheOptions,
                                         TimeUnit, CRecordBatchReader)
 from pyarrow.lib cimport _Weakrefable
 
@@ -393,6 +393,8 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" 
nogil:
         int64_t batch_size()
         void set_pre_buffer(c_bool pre_buffer)
         c_bool pre_buffer() const
+        void set_cache_options(CCacheOptions options)
+        CCacheOptions cache_options() const
         void set_coerce_int96_timestamp_unit(TimeUnit unit)
         TimeUnit coerce_int96_timestamp_unit() const
 
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index b0b89f8614..403846a38f 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1347,6 +1347,22 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" 
nogil:
         CStatus Write(const uint8_t* data, int64_t nbytes)
         CStatus Flush()
 
+    cdef cppclass CCacheOptions "arrow::io::CacheOptions":
+        int64_t hole_size_limit
+        int64_t range_size_limit
+        c_bool lazy
+        int64_t prefetch_limit
+        c_bool Equals "operator==" (CCacheOptions other)
+
+        @staticmethod
+        CCacheOptions MakeFromNetworkMetrics(int64_t time_to_first_byte_millis,
+                                             int64_t 
transfer_bandwidth_mib_per_sec,
+                                             double 
ideal_bandwidth_utilization_frac,
+                                             int64_t 
max_ideal_request_size_mib)
+
+        @staticmethod
+        CCacheOptions LazyDefaults()
+
     cdef cppclass COutputStream" arrow::io::OutputStream"(FileInterface,
                                                           Writable):
         pass
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 6f39166401..1897e76efc 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -2122,6 +2122,140 @@ cdef CCompressionType _ensure_compression(str name) 
except *:
         raise ValueError('Invalid value for compression: {!r}'.format(name))
 
 
+cdef class CacheOptions(_Weakrefable):
+    """
+    Cache options for a pre-buffered fragment scan.
+
+    Parameters
+    ----------
+    hole_size_limit : int, default 8KiB
+        The maximum distance in bytes between two consecutive ranges; beyond 
+        this value, ranges are not combined.
+    range_size_limit : int, default 32MiB
+        The maximum size in bytes of a combined range; if combining two 
+        consecutive ranges would produce a range of a size greater than this, 
+        they are not combined
+    lazy : bool, default True
+        lazy = false: request all byte ranges when PreBuffer or WillNeed is 
called.
+        lazy = True, prefetch_limit = 0: request merged byte ranges only after 
the reader 
+        needs them. 
+        lazy = True, prefetch_limit = k: prefetch up to k merged byte ranges 
ahead of the 
+        range that is currently being read.
+    prefetch_limit : int, default 0
+        The maximum number of ranges to be prefetched. This is only used for 
+        lazy cache to asynchronously read some ranges after reading the target 
+        range.
+    """
+
+    def __init__(self, *, hole_size_limit=None, range_size_limit=None, 
lazy=None, prefetch_limit=None):
+        self.wrapped = CCacheOptions.LazyDefaults()
+        if hole_size_limit is not None:
+            self.hole_size_limit = hole_size_limit
+        if range_size_limit is not None:
+            self.range_size_limit = range_size_limit
+        if lazy is not None:
+            self.lazy = lazy
+        if prefetch_limit is not None:
+            self.prefetch_limit = prefetch_limit
+
+    cdef void init(self, CCacheOptions options):
+        self.wrapped = options
+
+    cdef inline CCacheOptions unwrap(self):
+        return self.wrapped
+
+    @staticmethod
+    cdef wrap(CCacheOptions options):
+        self = CacheOptions()
+        self.init(options)
+        return self
+
+    @property
+    def hole_size_limit(self):
+        return self.wrapped.hole_size_limit
+
+    @hole_size_limit.setter
+    def hole_size_limit(self, hole_size_limit):
+        self.wrapped.hole_size_limit = hole_size_limit
+
+    @property
+    def range_size_limit(self):
+        return self.wrapped.range_size_limit
+
+    @range_size_limit.setter
+    def range_size_limit(self, range_size_limit):
+        self.wrapped.range_size_limit = range_size_limit
+
+    @property
+    def lazy(self):
+        return self.wrapped.lazy
+
+    @lazy.setter
+    def lazy(self, lazy):
+        self.wrapped.lazy = lazy
+
+    @property
+    def prefetch_limit(self):
+        return self.wrapped.prefetch_limit
+
+    @prefetch_limit.setter
+    def prefetch_limit(self, prefetch_limit):
+        self.wrapped.prefetch_limit = prefetch_limit
+
+    def __eq__(self, CacheOptions other):
+        try:
+            return self.unwrap().Equals(other.unwrap())
+        except TypeError:
+            return False
+
+    @staticmethod
+    def from_network_metrics(time_to_first_byte_millis, 
transfer_bandwidth_mib_per_sec,
+                             ideal_bandwidth_utilization_frac=0.9, 
max_ideal_request_size_mib=64):
+        """
+        Create suiteable CacheOptions based on provided network metrics.
+
+        Typically this will be used with object storage solutions like Amazon 
S3, 
+        Google Cloud Storage and Azure Blob Storage.
+
+        Parameters
+        ----------
+        time_to_first_byte_millis : int
+            Seek-time or Time-To-First-Byte (TTFB) in milliseconds, also 
called call 
+            setup latency of a new read request. The value is a positive 
integer. 
+        transfer_bandwidth_mib_per_sec : int
+            Data transfer Bandwidth (BW) in MiB/sec (per connection). The 
value is a positive 
+            integer.
+        ideal_bandwidth_utilization_frac : int, default 0.9
+            Transfer bandwidth utilization fraction (per connection) to 
maximize the net 
+            data load. The value is a positive float less than 1.
+        max_ideal_request_size_mib : int, default 64
+            The maximum single data request size (in MiB) to maximize the net 
data load.
+
+        Returns
+        -------
+        CacheOptions
+        """
+        return CacheOptions.wrap(CCacheOptions.MakeFromNetworkMetrics(
+            time_to_first_byte_millis, transfer_bandwidth_mib_per_sec,
+            ideal_bandwidth_utilization_frac, max_ideal_request_size_mib))
+
+    @staticmethod
+    @binding(True)  # Required for Cython < 3
+    def _reconstruct(kwargs):
+        # __reduce__ doesn't allow passing named arguments directly to the
+        # reconstructor, hence this wrapper.
+        return CacheOptions(**kwargs)
+
+    def __reduce__(self):
+        kwargs = dict(
+            hole_size_limit=self.hole_size_limit,
+            range_size_limit=self.range_size_limit,
+            lazy=self.lazy,
+            prefetch_limit=self.prefetch_limit,
+        )
+        return CacheOptions._reconstruct, (kwargs,)
+
+
 cdef class Codec(_Weakrefable):
     """
     Compression codec.
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 1440ba0750..58ec34addb 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -561,6 +561,18 @@ cdef class RecordBatchReader(_Weakrefable):
         SharedPtrNoGIL[CRecordBatchReader] reader
 
 
+cdef class CacheOptions(_Weakrefable):
+    cdef:
+        CCacheOptions wrapped
+
+    cdef void init(self, CCacheOptions options)
+
+    cdef inline CCacheOptions unwrap(self)
+
+    @staticmethod
+    cdef wrap(const CCacheOptions options)
+
+
 cdef class Codec(_Weakrefable):
     cdef:
         shared_ptr[CCodec] wrapped
diff --git a/python/pyarrow/tests/test_dataset.py 
b/python/pyarrow/tests/test_dataset.py
index f3c25ee8c5..a37eb1e426 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -16,17 +16,16 @@
 # under the License.
 
 import contextlib
-import os
-import posixpath
 import datetime
+import os
 import pathlib
+import posixpath
 import sys
-import textwrap
 import tempfile
+import textwrap
 import threading
 import time
 from shutil import copytree
-
 from urllib.parse import quote
 
 import numpy as np
@@ -35,12 +34,12 @@ import pytest
 import pyarrow as pa
 import pyarrow.compute as pc
 import pyarrow.csv
-import pyarrow.json
 import pyarrow.feather
 import pyarrow.fs as fs
-from pyarrow.tests.util import (change_cwd, _filesystem_uri,
-                                FSProtocolClass, ProxyHandler,
-                                _configure_s3_limited_user)
+import pyarrow.json
+from pyarrow.tests.util import (FSProtocolClass, ProxyHandler,
+                                _configure_s3_limited_user, _filesystem_uri,
+                                change_cwd)
 
 try:
     import pandas as pd
@@ -138,7 +137,8 @@ def mockfs():
 
 @pytest.fixture
 def open_logging_fs(monkeypatch):
-    from pyarrow.fs import PyFileSystem, LocalFileSystem
+    from pyarrow.fs import LocalFileSystem, PyFileSystem
+
     from .test_fs import ProxyHandler
 
     localfs = LocalFileSystem()
@@ -791,6 +791,9 @@ def test_parquet_scan_options():
         thrift_container_size_limit=987654,)
     opts6 = ds.ParquetFragmentScanOptions(
         page_checksum_verification=True)
+    cache_opts = pa.CacheOptions(
+        hole_size_limit=2**10, range_size_limit=8*2**10, lazy=True)
+    opts7 = ds.ParquetFragmentScanOptions(pre_buffer=True, 
cache_options=cache_opts)
 
     assert opts1.use_buffered_stream is False
     assert opts1.buffer_size == 2**13
@@ -816,12 +819,17 @@ def test_parquet_scan_options():
 
     assert opts6.page_checksum_verification is True
 
+    assert opts7.pre_buffer is True
+    assert opts7.cache_options == cache_opts
+    assert opts7.cache_options != opts1.cache_options
+
     assert opts1 == opts1
     assert opts1 != opts2
     assert opts2 != opts3
     assert opts3 != opts4
     assert opts5 != opts1
     assert opts6 != opts1
+    assert opts7 != opts1
 
 
 def test_file_format_pickling(pickle_module):
@@ -2711,7 +2719,7 @@ def 
test_open_dataset_from_uri_s3_fsspec(s3_example_simple):
     table, path, _, _, host, port, access_key, secret_key = s3_example_simple
     s3fs = pytest.importorskip("s3fs")
 
-    from pyarrow.fs import PyFileSystem, FSSpecHandler
+    from pyarrow.fs import FSSpecHandler, PyFileSystem
 
     fs = s3fs.S3FileSystem(
         key=access_key,
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 071962af29..5a495aa80a 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -664,6 +664,65 @@ def test_allocate_buffer_resizable():
     assert buf.size == 200
 
 
+def test_cache_options():
+    opts1 = pa.CacheOptions()
+    opts2 = pa.CacheOptions(hole_size_limit=1024)
+    opts3 = pa.CacheOptions(hole_size_limit=4096, range_size_limit=8192)
+    opts4 = pa.CacheOptions(hole_size_limit=4096,
+                            range_size_limit=8192, prefetch_limit=5)
+    opts5 = pa.CacheOptions(hole_size_limit=4096,
+                            range_size_limit=8192, lazy=False)
+    opts6 = pa.CacheOptions.from_network_metrics(time_to_first_byte_millis=100,
+                                                 
transfer_bandwidth_mib_per_sec=200,
+                                                 
ideal_bandwidth_utilization_frac=0.9,
+                                                 max_ideal_request_size_mib=64)
+
+    assert opts1.hole_size_limit == 8192
+    assert opts1.range_size_limit == 32 * 1024 * 1024
+    assert opts1.lazy is True
+    assert opts1.prefetch_limit == 0
+
+    assert opts2.hole_size_limit == 1024
+    assert opts2.range_size_limit == 32 * 1024 * 1024
+    assert opts2.lazy is True
+    assert opts2.prefetch_limit == 0
+
+    assert opts3.hole_size_limit == 4096
+    assert opts3.range_size_limit == 8192
+    assert opts3.lazy is True
+    assert opts3.prefetch_limit == 0
+
+    assert opts4.hole_size_limit == 4096
+    assert opts4.range_size_limit == 8192
+    assert opts4.lazy is True
+    assert opts4.prefetch_limit == 5
+
+    assert opts5.hole_size_limit == 4096
+    assert opts5.range_size_limit == 8192
+    assert opts5.lazy is False
+    assert opts5.prefetch_limit == 0
+
+    assert opts6.lazy is False
+
+    assert opts1 == opts1
+    assert opts1 != opts2
+    assert opts2 != opts3
+    assert opts3 != opts4
+    assert opts4 != opts5
+    assert opts6 != opts1
+
+
+def test_cache_options_pickling(pickle_module):
+    options = [
+        pa.CacheOptions(),
+        pa.CacheOptions(hole_size_limit=4096, range_size_limit=8192,
+                        lazy=True, prefetch_limit=5),
+    ]
+
+    for option in options:
+        assert pickle_module.loads(pickle_module.dumps(option)) == option
+
+
 @pytest.mark.parametrize("compression", [
     pytest.param(
         "bz2", marks=pytest.mark.xfail(raises=pa.lib.ArrowNotImplementedError)

Reply via email to