jorisvandenbossche commented on code in PR #36627:
URL: https://github.com/apache/arrow/pull/36627#discussion_r1425290596


##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -692,6 +693,10 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
         parallel using a background I/O thread pool.
         Set to False if you want to prioritize minimal memory usage
         over maximum speed.
+    cache_options : pyarrow.dataset.CacheOptions, default None

Review Comment:
   ```suggestion
       cache_options : pyarrow.CacheOptions, default None
   ```



##########
python/pyarrow/io.pxi:
##########
@@ -2120,6 +2120,140 @@ cdef CCompressionType _ensure_compression(str name) 
except *:
         raise ValueError('Invalid value for compression: {!r}'.format(name))
 
 
+cdef class CacheOptions(_Weakrefable):
+    """
+    Cache options for a pre-buffered fragment scan.
+
+    Parameters
+    ----------
+    hole_size_limit : int, default 8Ki

Review Comment:
   ```suggestion
       hole_size_limit : int, default 8KiB
   ```
   
   Small nitpick, but I _think_ generally the B is still included for those 
numbers that are a power of 2? 



##########
python/pyarrow/io.pxi:
##########
@@ -2120,6 +2120,140 @@ cdef CCompressionType _ensure_compression(str name) 
except *:
         raise ValueError('Invalid value for compression: {!r}'.format(name))
 
 
+cdef class CacheOptions(_Weakrefable):
+    """
+    Cache options for a pre-buffered fragment scan.
+
+    Parameters
+    ----------
+    hole_size_limit : int, default 8Ki
+        The maximum distance in bytes between two consecutive ranges; beyond 
+        this value, ranges are not combined.
+    range_size_limit : int, default 32Mi
+        The maximum size in bytes of a combined range; if combining two 
+        consecutive ranges would produce a range of a size greater than this, 
+        they are not combined
+    lazy : bool, default False
+        lazy = false: request all byte ranges when PreBuffer or WillNeed is 
called.
+        lazy = True, prefetch_limit = 0: request merged byte ranges only after 
the reader 
+        needs them. 
+        lazy = True, prefetch_limit = k: prefetch up to k merged byte ranges 
ahead of the 
+        range that is currently being read.
+    prefetch_limit : int, default 0
+        The maximum number of ranges to be prefetched. This is only used for 
+        lazy cache to asynchronously read some ranges after reading the target 
+        range.
+    """
+
+    def __init__(self, *, hole_size_limit=None, range_size_limit=None, 
lazy=None, prefetch_limit=None):
+        self.wrapped = CCacheOptions.Defaults()
+        if hole_size_limit is not None:
+            self.hole_size_limit = hole_size_limit
+        if range_size_limit is not None:
+            self.range_size_limit = range_size_limit
+        if lazy is not None:
+            self.lazy = lazy
+        if prefetch_limit is not None:
+            self.prefetch_limit = prefetch_limit
+
+    cdef void init(self, CCacheOptions options):
+        self.wrapped = options
+
+    cdef inline CCacheOptions unwrap(self):
+        return self.wrapped
+
+    @staticmethod
+    cdef wrap(CCacheOptions options):
+        self = CacheOptions()
+        self.init(options)
+        return self
+
+    @property
+    def hole_size_limit(self):
+        return self.wrapped.hole_size_limit
+
+    @hole_size_limit.setter
+    def hole_size_limit(self, hole_size_limit):
+        self.wrapped.hole_size_limit = hole_size_limit
+
+    @property
+    def range_size_limit(self):
+        return self.wrapped.range_size_limit
+
+    @range_size_limit.setter
+    def range_size_limit(self, range_size_limit):
+        self.wrapped.range_size_limit = range_size_limit
+
+    @property
+    def lazy(self):
+        return self.wrapped.lazy
+
+    @lazy.setter
+    def lazy(self, lazy):
+        self.wrapped.lazy = lazy
+
+    @property
+    def prefetch_limit(self):
+        return self.wrapped.prefetch_limit
+
+    @prefetch_limit.setter
+    def prefetch_limit(self, prefetch_limit):
+        self.wrapped.prefetch_limit = prefetch_limit
+
+    def __eq__(self, CacheOptions other):
+        try:
+            return self.unwrap().Equals(other.unwrap())
+        except TypeError:
+            return False
+
+    @staticmethod
+    def from_network_metrics(time_to_first_byte_millis, 
transfer_bandwidth_mib_per_sec,
+                             ideal_bandwidth_utilization_frac, 
max_ideal_request_size_mib):
+        """
+        Create suiteable CacheOptions based on provided network metrics.
+
+        Typically this will be used with object storage solutions like Amazon 
S3, 
+        Google Cloud Storage and Azure Blob Storage.
+
+        Parameters
+        ----------
+        time_to_first_byte_millis : int
+            Seek-time or Time-To-First-Byte (TTFB) in milliseconds, also 
called call 
+            setup latency of a new read request. The value is a positive 
integer. 
+        transfer_bandwidth_mib_per_sec : int
+            Data transfer Bandwidth (BW) in MiB/sec (per connection). The 
value is a positive 
+            integer.
+        ideal_bandwidth_utilization_frac : int
+            Transfer bandwidth utilization fraction (per connection) to 
maximize the net 
+            data load. The value is a positive float less than 1.
+        max_ideal_request_size_mib : int

Review Comment:
   ```suggestion
           max_ideal_request_size_mib : int, default 64
   ```



##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -692,6 +693,10 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
         parallel using a background I/O thread pool.
         Set to False if you want to prioritize minimal memory usage
         over maximum speed.
+    cache_options : pyarrow.dataset.CacheOptions, default None
+        Cache options used when pre_buffer is enabled. The default values 
should

Review Comment:
   One potentially confusing aspect is that the ArrowReaderProperties now 
actually use `LazyDefaults()` instead of `Defaults()`. So when we say here "the 
default values" might not exactly be clear to the user reading this, because 
the default for the python bindings `pa.CacheOptions()` is a different default 
..
   
   So maybe we should clarify here that by default lazy is turned on?



##########
python/pyarrow/io.pxi:
##########
@@ -2120,6 +2120,140 @@ cdef CCompressionType _ensure_compression(str name) 
except *:
         raise ValueError('Invalid value for compression: {!r}'.format(name))
 
 
+cdef class CacheOptions(_Weakrefable):
+    """
+    Cache options for a pre-buffered fragment scan.
+
+    Parameters
+    ----------
+    hole_size_limit : int, default 8Ki
+        The maximum distance in bytes between two consecutive ranges; beyond 
+        this value, ranges are not combined.
+    range_size_limit : int, default 32Mi

Review Comment:
   ```suggestion
       range_size_limit : int, default 32MiB
   ```



##########
python/pyarrow/tests/test_dataset.py:
##########
@@ -16,31 +16,29 @@
 # under the License.
 
 import contextlib
-import os
-import posixpath
 import datetime
+import os
 import pathlib
 import pickle
+import posixpath
 import sys
-import textwrap
 import tempfile
+import textwrap
 import threading
 import time
-
 from urllib.parse import quote
 
 import numpy as np
-import pytest
-
 import pyarrow as pa
 import pyarrow.compute as pc
 import pyarrow.csv
-import pyarrow.json
 import pyarrow.feather
 import pyarrow.fs as fs
-from pyarrow.tests.util import (change_cwd, _filesystem_uri,
-                                FSProtocolClass, ProxyHandler,
-                                _configure_s3_limited_user)
+import pyarrow.json
+import pytest
+from pyarrow.tests.util import (FSProtocolClass, ProxyHandler,
+                                _configure_s3_limited_user, _filesystem_uri,
+                                change_cwd)

Review Comment:
   Generally that looks ok, but the moving of `pyarrow.json` and `pytest` seems 
incorrect to me, though (no idea why archery did that)



##########
python/pyarrow/io.pxi:
##########
@@ -2120,6 +2120,140 @@ cdef CCompressionType _ensure_compression(str name) 
except *:
         raise ValueError('Invalid value for compression: {!r}'.format(name))
 
 
+cdef class CacheOptions(_Weakrefable):
+    """
+    Cache options for a pre-buffered fragment scan.
+
+    Parameters
+    ----------
+    hole_size_limit : int, default 8Ki
+        The maximum distance in bytes between two consecutive ranges; beyond 
+        this value, ranges are not combined.
+    range_size_limit : int, default 32Mi
+        The maximum size in bytes of a combined range; if combining two 
+        consecutive ranges would produce a range of a size greater than this, 
+        they are not combined
+    lazy : bool, default False
+        lazy = false: request all byte ranges when PreBuffer or WillNeed is 
called.
+        lazy = True, prefetch_limit = 0: request merged byte ranges only after 
the reader 
+        needs them. 
+        lazy = True, prefetch_limit = k: prefetch up to k merged byte ranges 
ahead of the 
+        range that is currently being read.
+    prefetch_limit : int, default 0
+        The maximum number of ranges to be prefetched. This is only used for 
+        lazy cache to asynchronously read some ranges after reading the target 
+        range.
+    """
+
+    def __init__(self, *, hole_size_limit=None, range_size_limit=None, 
lazy=None, prefetch_limit=None):
+        self.wrapped = CCacheOptions.Defaults()
+        if hole_size_limit is not None:
+            self.hole_size_limit = hole_size_limit
+        if range_size_limit is not None:
+            self.range_size_limit = range_size_limit
+        if lazy is not None:
+            self.lazy = lazy
+        if prefetch_limit is not None:
+            self.prefetch_limit = prefetch_limit
+
+    cdef void init(self, CCacheOptions options):
+        self.wrapped = options
+
+    cdef inline CCacheOptions unwrap(self):
+        return self.wrapped
+
+    @staticmethod
+    cdef wrap(CCacheOptions options):
+        self = CacheOptions()
+        self.init(options)
+        return self
+
+    @property
+    def hole_size_limit(self):
+        return self.wrapped.hole_size_limit
+
+    @hole_size_limit.setter
+    def hole_size_limit(self, hole_size_limit):
+        self.wrapped.hole_size_limit = hole_size_limit
+
+    @property
+    def range_size_limit(self):
+        return self.wrapped.range_size_limit
+
+    @range_size_limit.setter
+    def range_size_limit(self, range_size_limit):
+        self.wrapped.range_size_limit = range_size_limit
+
+    @property
+    def lazy(self):
+        return self.wrapped.lazy
+
+    @lazy.setter
+    def lazy(self, lazy):
+        self.wrapped.lazy = lazy
+
+    @property
+    def prefetch_limit(self):
+        return self.wrapped.prefetch_limit
+
+    @prefetch_limit.setter
+    def prefetch_limit(self, prefetch_limit):
+        self.wrapped.prefetch_limit = prefetch_limit
+
+    def __eq__(self, CacheOptions other):
+        try:
+            return self.unwrap().Equals(other.unwrap())
+        except TypeError:
+            return False
+
+    @staticmethod
+    def from_network_metrics(time_to_first_byte_millis, 
transfer_bandwidth_mib_per_sec,
+                             ideal_bandwidth_utilization_frac, 
max_ideal_request_size_mib):

Review Comment:
   ```suggestion
                                ideal_bandwidth_utilization_frac=0.9, 
max_ideal_request_size_mib=64):
   ```
   
   On the C++ side those two params have defaults, so use here as well?



##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -692,6 +693,10 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
         parallel using a background I/O thread pool.
         Set to False if you want to prioritize minimal memory usage
         over maximum speed.
+    cache_options : pyarrow.dataset.CacheOptions, default None
+        Cache options used when pre_buffer is enabled. The default values 
should
+        be good for most usecases. You may want to adject these for example if

Review Comment:
   ```suggestion
           be good for most use cases. You may want to change these for example 
if
   ```
   
   (I had too look up "adject" ;))



##########
python/pyarrow/io.pxi:
##########
@@ -2120,6 +2120,140 @@ cdef CCompressionType _ensure_compression(str name) 
except *:
         raise ValueError('Invalid value for compression: {!r}'.format(name))
 
 
+cdef class CacheOptions(_Weakrefable):
+    """
+    Cache options for a pre-buffered fragment scan.
+
+    Parameters
+    ----------
+    hole_size_limit : int, default 8Ki
+        The maximum distance in bytes between two consecutive ranges; beyond 
+        this value, ranges are not combined.
+    range_size_limit : int, default 32Mi
+        The maximum size in bytes of a combined range; if combining two 
+        consecutive ranges would produce a range of a size greater than this, 
+        they are not combined
+    lazy : bool, default False
+        lazy = false: request all byte ranges when PreBuffer or WillNeed is 
called.
+        lazy = True, prefetch_limit = 0: request merged byte ranges only after 
the reader 
+        needs them. 
+        lazy = True, prefetch_limit = k: prefetch up to k merged byte ranges 
ahead of the 
+        range that is currently being read.
+    prefetch_limit : int, default 0
+        The maximum number of ranges to be prefetched. This is only used for 
+        lazy cache to asynchronously read some ranges after reading the target 
+        range.
+    """
+
+    def __init__(self, *, hole_size_limit=None, range_size_limit=None, 
lazy=None, prefetch_limit=None):
+        self.wrapped = CCacheOptions.Defaults()
+        if hole_size_limit is not None:
+            self.hole_size_limit = hole_size_limit
+        if range_size_limit is not None:
+            self.range_size_limit = range_size_limit
+        if lazy is not None:
+            self.lazy = lazy
+        if prefetch_limit is not None:
+            self.prefetch_limit = prefetch_limit
+
+    cdef void init(self, CCacheOptions options):
+        self.wrapped = options
+
+    cdef inline CCacheOptions unwrap(self):
+        return self.wrapped
+
+    @staticmethod
+    cdef wrap(CCacheOptions options):
+        self = CacheOptions()
+        self.init(options)
+        return self
+
+    @property
+    def hole_size_limit(self):
+        return self.wrapped.hole_size_limit
+
+    @hole_size_limit.setter
+    def hole_size_limit(self, hole_size_limit):
+        self.wrapped.hole_size_limit = hole_size_limit
+
+    @property
+    def range_size_limit(self):
+        return self.wrapped.range_size_limit
+
+    @range_size_limit.setter
+    def range_size_limit(self, range_size_limit):
+        self.wrapped.range_size_limit = range_size_limit
+
+    @property
+    def lazy(self):
+        return self.wrapped.lazy
+
+    @lazy.setter
+    def lazy(self, lazy):
+        self.wrapped.lazy = lazy
+
+    @property
+    def prefetch_limit(self):
+        return self.wrapped.prefetch_limit
+
+    @prefetch_limit.setter
+    def prefetch_limit(self, prefetch_limit):
+        self.wrapped.prefetch_limit = prefetch_limit
+
+    def __eq__(self, CacheOptions other):
+        try:
+            return self.unwrap().Equals(other.unwrap())
+        except TypeError:
+            return False
+
+    @staticmethod
+    def from_network_metrics(time_to_first_byte_millis, 
transfer_bandwidth_mib_per_sec,
+                             ideal_bandwidth_utilization_frac, 
max_ideal_request_size_mib):
+        """
+        Create suiteable CacheOptions based on provided network metrics.
+
+        Typically this will be used with object storage solutions like Amazon 
S3, 
+        Google Cloud Storage and Azure Blob Storage.
+
+        Parameters
+        ----------
+        time_to_first_byte_millis : int
+            Seek-time or Time-To-First-Byte (TTFB) in milliseconds, also 
called call 
+            setup latency of a new read request. The value is a positive 
integer. 
+        transfer_bandwidth_mib_per_sec : int
+            Data transfer Bandwidth (BW) in MiB/sec (per connection). The 
value is a positive 
+            integer.
+        ideal_bandwidth_utilization_frac : int

Review Comment:
   ```suggestion
           ideal_bandwidth_utilization_frac : int, default 0.9
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to