jorisvandenbossche commented on code in PR #36627:
URL: https://github.com/apache/arrow/pull/36627#discussion_r1425290596
##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -692,6 +693,10 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
parallel using a background I/O thread pool.
Set to False if you want to prioritize minimal memory usage
over maximum speed.
+ cache_options : pyarrow.dataset.CacheOptions, default None
Review Comment:
```suggestion
cache_options : pyarrow.CacheOptions, default None
```
##########
python/pyarrow/io.pxi:
##########
@@ -2120,6 +2120,140 @@ cdef CCompressionType _ensure_compression(str name)
except *:
raise ValueError('Invalid value for compression: {!r}'.format(name))
+cdef class CacheOptions(_Weakrefable):
+ """
+ Cache options for a pre-buffered fragment scan.
+
+ Parameters
+ ----------
+ hole_size_limit : int, default 8Ki
Review Comment:
```suggestion
hole_size_limit : int, default 8KiB
```
Small nitpick, but I _think_ generally the B is still included for those
numbers that are a power of 2?
##########
python/pyarrow/io.pxi:
##########
@@ -2120,6 +2120,140 @@ cdef CCompressionType _ensure_compression(str name)
except *:
raise ValueError('Invalid value for compression: {!r}'.format(name))
+cdef class CacheOptions(_Weakrefable):
+ """
+ Cache options for a pre-buffered fragment scan.
+
+ Parameters
+ ----------
+ hole_size_limit : int, default 8Ki
+ The maximum distance in bytes between two consecutive ranges; beyond
+ this value, ranges are not combined.
+ range_size_limit : int, default 32Mi
+ The maximum size in bytes of a combined range; if combining two
+ consecutive ranges would produce a range of a size greater than this,
+ they are not combined
+ lazy : bool, default False
+ lazy = false: request all byte ranges when PreBuffer or WillNeed is
called.
+ lazy = True, prefetch_limit = 0: request merged byte ranges only after
the reader
+ needs them.
+ lazy = True, prefetch_limit = k: prefetch up to k merged byte ranges
ahead of the
+ range that is currently being read.
+ prefetch_limit : int, default 0
+ The maximum number of ranges to be prefetched. This is only used for
+ lazy cache to asynchronously read some ranges after reading the target
+ range.
+ """
+
+ def __init__(self, *, hole_size_limit=None, range_size_limit=None,
lazy=None, prefetch_limit=None):
+ self.wrapped = CCacheOptions.Defaults()
+ if hole_size_limit is not None:
+ self.hole_size_limit = hole_size_limit
+ if range_size_limit is not None:
+ self.range_size_limit = range_size_limit
+ if lazy is not None:
+ self.lazy = lazy
+ if prefetch_limit is not None:
+ self.prefetch_limit = prefetch_limit
+
+ cdef void init(self, CCacheOptions options):
+ self.wrapped = options
+
+ cdef inline CCacheOptions unwrap(self):
+ return self.wrapped
+
+ @staticmethod
+ cdef wrap(CCacheOptions options):
+ self = CacheOptions()
+ self.init(options)
+ return self
+
+ @property
+ def hole_size_limit(self):
+ return self.wrapped.hole_size_limit
+
+ @hole_size_limit.setter
+ def hole_size_limit(self, hole_size_limit):
+ self.wrapped.hole_size_limit = hole_size_limit
+
+ @property
+ def range_size_limit(self):
+ return self.wrapped.range_size_limit
+
+ @range_size_limit.setter
+ def range_size_limit(self, range_size_limit):
+ self.wrapped.range_size_limit = range_size_limit
+
+ @property
+ def lazy(self):
+ return self.wrapped.lazy
+
+ @lazy.setter
+ def lazy(self, lazy):
+ self.wrapped.lazy = lazy
+
+ @property
+ def prefetch_limit(self):
+ return self.wrapped.prefetch_limit
+
+ @prefetch_limit.setter
+ def prefetch_limit(self, prefetch_limit):
+ self.wrapped.prefetch_limit = prefetch_limit
+
+ def __eq__(self, CacheOptions other):
+ try:
+ return self.unwrap().Equals(other.unwrap())
+ except TypeError:
+ return False
+
+ @staticmethod
+ def from_network_metrics(time_to_first_byte_millis,
transfer_bandwidth_mib_per_sec,
+ ideal_bandwidth_utilization_frac,
max_ideal_request_size_mib):
+ """
+ Create suiteable CacheOptions based on provided network metrics.
+
+ Typically this will be used with object storage solutions like Amazon
S3,
+ Google Cloud Storage and Azure Blob Storage.
+
+ Parameters
+ ----------
+ time_to_first_byte_millis : int
+ Seek-time or Time-To-First-Byte (TTFB) in milliseconds, also
called call
+ setup latency of a new read request. The value is a positive
integer.
+ transfer_bandwidth_mib_per_sec : int
+ Data transfer Bandwidth (BW) in MiB/sec (per connection). The
value is a positive
+ integer.
+ ideal_bandwidth_utilization_frac : int
+ Transfer bandwidth utilization fraction (per connection) to
maximize the net
+ data load. The value is a positive float less than 1.
+ max_ideal_request_size_mib : int
Review Comment:
```suggestion
max_ideal_request_size_mib : int, default 64
```
##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -692,6 +693,10 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
parallel using a background I/O thread pool.
Set to False if you want to prioritize minimal memory usage
over maximum speed.
+ cache_options : pyarrow.dataset.CacheOptions, default None
+ Cache options used when pre_buffer is enabled. The default values
should
Review Comment:
One potentially confusing aspect is that the ArrowReaderProperties now
actually use `LazyDefaults()` instead of `Defaults()`. So when we say here "the
default values" might not exactly be clear to the user reading this, because
the default for the python bindings `pa.CacheOptions()` is a different default
..
So maybe we should clarify here that by default lazy is turned on?
##########
python/pyarrow/io.pxi:
##########
@@ -2120,6 +2120,140 @@ cdef CCompressionType _ensure_compression(str name)
except *:
raise ValueError('Invalid value for compression: {!r}'.format(name))
+cdef class CacheOptions(_Weakrefable):
+ """
+ Cache options for a pre-buffered fragment scan.
+
+ Parameters
+ ----------
+ hole_size_limit : int, default 8Ki
+ The maximum distance in bytes between two consecutive ranges; beyond
+ this value, ranges are not combined.
+ range_size_limit : int, default 32Mi
Review Comment:
```suggestion
range_size_limit : int, default 32MiB
```
##########
python/pyarrow/tests/test_dataset.py:
##########
@@ -16,31 +16,29 @@
# under the License.
import contextlib
-import os
-import posixpath
import datetime
+import os
import pathlib
import pickle
+import posixpath
import sys
-import textwrap
import tempfile
+import textwrap
import threading
import time
-
from urllib.parse import quote
import numpy as np
-import pytest
-
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.csv
-import pyarrow.json
import pyarrow.feather
import pyarrow.fs as fs
-from pyarrow.tests.util import (change_cwd, _filesystem_uri,
- FSProtocolClass, ProxyHandler,
- _configure_s3_limited_user)
+import pyarrow.json
+import pytest
+from pyarrow.tests.util import (FSProtocolClass, ProxyHandler,
+ _configure_s3_limited_user, _filesystem_uri,
+ change_cwd)
Review Comment:
Generally that looks ok, but the moving of `pyarrow.json` and `pytest` seems
incorrect to me, though (no idea why archery did that)
##########
python/pyarrow/io.pxi:
##########
@@ -2120,6 +2120,140 @@ cdef CCompressionType _ensure_compression(str name)
except *:
raise ValueError('Invalid value for compression: {!r}'.format(name))
+cdef class CacheOptions(_Weakrefable):
+ """
+ Cache options for a pre-buffered fragment scan.
+
+ Parameters
+ ----------
+ hole_size_limit : int, default 8Ki
+ The maximum distance in bytes between two consecutive ranges; beyond
+ this value, ranges are not combined.
+ range_size_limit : int, default 32Mi
+ The maximum size in bytes of a combined range; if combining two
+ consecutive ranges would produce a range of a size greater than this,
+ they are not combined
+ lazy : bool, default False
+ lazy = false: request all byte ranges when PreBuffer or WillNeed is
called.
+ lazy = True, prefetch_limit = 0: request merged byte ranges only after
the reader
+ needs them.
+ lazy = True, prefetch_limit = k: prefetch up to k merged byte ranges
ahead of the
+ range that is currently being read.
+ prefetch_limit : int, default 0
+ The maximum number of ranges to be prefetched. This is only used for
+ lazy cache to asynchronously read some ranges after reading the target
+ range.
+ """
+
+ def __init__(self, *, hole_size_limit=None, range_size_limit=None,
lazy=None, prefetch_limit=None):
+ self.wrapped = CCacheOptions.Defaults()
+ if hole_size_limit is not None:
+ self.hole_size_limit = hole_size_limit
+ if range_size_limit is not None:
+ self.range_size_limit = range_size_limit
+ if lazy is not None:
+ self.lazy = lazy
+ if prefetch_limit is not None:
+ self.prefetch_limit = prefetch_limit
+
+ cdef void init(self, CCacheOptions options):
+ self.wrapped = options
+
+ cdef inline CCacheOptions unwrap(self):
+ return self.wrapped
+
+ @staticmethod
+ cdef wrap(CCacheOptions options):
+ self = CacheOptions()
+ self.init(options)
+ return self
+
+ @property
+ def hole_size_limit(self):
+ return self.wrapped.hole_size_limit
+
+ @hole_size_limit.setter
+ def hole_size_limit(self, hole_size_limit):
+ self.wrapped.hole_size_limit = hole_size_limit
+
+ @property
+ def range_size_limit(self):
+ return self.wrapped.range_size_limit
+
+ @range_size_limit.setter
+ def range_size_limit(self, range_size_limit):
+ self.wrapped.range_size_limit = range_size_limit
+
+ @property
+ def lazy(self):
+ return self.wrapped.lazy
+
+ @lazy.setter
+ def lazy(self, lazy):
+ self.wrapped.lazy = lazy
+
+ @property
+ def prefetch_limit(self):
+ return self.wrapped.prefetch_limit
+
+ @prefetch_limit.setter
+ def prefetch_limit(self, prefetch_limit):
+ self.wrapped.prefetch_limit = prefetch_limit
+
+ def __eq__(self, CacheOptions other):
+ try:
+ return self.unwrap().Equals(other.unwrap())
+ except TypeError:
+ return False
+
+ @staticmethod
+ def from_network_metrics(time_to_first_byte_millis,
transfer_bandwidth_mib_per_sec,
+ ideal_bandwidth_utilization_frac,
max_ideal_request_size_mib):
Review Comment:
```suggestion
ideal_bandwidth_utilization_frac=0.9,
max_ideal_request_size_mib=64):
```
On the C++ side those two params have defaults, so use here as well?
##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -692,6 +693,10 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
parallel using a background I/O thread pool.
Set to False if you want to prioritize minimal memory usage
over maximum speed.
+ cache_options : pyarrow.dataset.CacheOptions, default None
+ Cache options used when pre_buffer is enabled. The default values
should
+ be good for most usecases. You may want to adject these for example if
Review Comment:
```suggestion
be good for most use cases. You may want to change these for example
if
```
(I had too look up "adject" ;))
##########
python/pyarrow/io.pxi:
##########
@@ -2120,6 +2120,140 @@ cdef CCompressionType _ensure_compression(str name)
except *:
raise ValueError('Invalid value for compression: {!r}'.format(name))
+cdef class CacheOptions(_Weakrefable):
+ """
+ Cache options for a pre-buffered fragment scan.
+
+ Parameters
+ ----------
+ hole_size_limit : int, default 8Ki
+ The maximum distance in bytes between two consecutive ranges; beyond
+ this value, ranges are not combined.
+ range_size_limit : int, default 32Mi
+ The maximum size in bytes of a combined range; if combining two
+ consecutive ranges would produce a range of a size greater than this,
+ they are not combined
+ lazy : bool, default False
+ lazy = false: request all byte ranges when PreBuffer or WillNeed is
called.
+ lazy = True, prefetch_limit = 0: request merged byte ranges only after
the reader
+ needs them.
+ lazy = True, prefetch_limit = k: prefetch up to k merged byte ranges
ahead of the
+ range that is currently being read.
+ prefetch_limit : int, default 0
+ The maximum number of ranges to be prefetched. This is only used for
+ lazy cache to asynchronously read some ranges after reading the target
+ range.
+ """
+
+ def __init__(self, *, hole_size_limit=None, range_size_limit=None,
lazy=None, prefetch_limit=None):
+ self.wrapped = CCacheOptions.Defaults()
+ if hole_size_limit is not None:
+ self.hole_size_limit = hole_size_limit
+ if range_size_limit is not None:
+ self.range_size_limit = range_size_limit
+ if lazy is not None:
+ self.lazy = lazy
+ if prefetch_limit is not None:
+ self.prefetch_limit = prefetch_limit
+
+ cdef void init(self, CCacheOptions options):
+ self.wrapped = options
+
+ cdef inline CCacheOptions unwrap(self):
+ return self.wrapped
+
+ @staticmethod
+ cdef wrap(CCacheOptions options):
+ self = CacheOptions()
+ self.init(options)
+ return self
+
+ @property
+ def hole_size_limit(self):
+ return self.wrapped.hole_size_limit
+
+ @hole_size_limit.setter
+ def hole_size_limit(self, hole_size_limit):
+ self.wrapped.hole_size_limit = hole_size_limit
+
+ @property
+ def range_size_limit(self):
+ return self.wrapped.range_size_limit
+
+ @range_size_limit.setter
+ def range_size_limit(self, range_size_limit):
+ self.wrapped.range_size_limit = range_size_limit
+
+ @property
+ def lazy(self):
+ return self.wrapped.lazy
+
+ @lazy.setter
+ def lazy(self, lazy):
+ self.wrapped.lazy = lazy
+
+ @property
+ def prefetch_limit(self):
+ return self.wrapped.prefetch_limit
+
+ @prefetch_limit.setter
+ def prefetch_limit(self, prefetch_limit):
+ self.wrapped.prefetch_limit = prefetch_limit
+
+ def __eq__(self, CacheOptions other):
+ try:
+ return self.unwrap().Equals(other.unwrap())
+ except TypeError:
+ return False
+
+ @staticmethod
+ def from_network_metrics(time_to_first_byte_millis,
transfer_bandwidth_mib_per_sec,
+ ideal_bandwidth_utilization_frac,
max_ideal_request_size_mib):
+ """
+ Create suiteable CacheOptions based on provided network metrics.
+
+ Typically this will be used with object storage solutions like Amazon
S3,
+ Google Cloud Storage and Azure Blob Storage.
+
+ Parameters
+ ----------
+ time_to_first_byte_millis : int
+ Seek-time or Time-To-First-Byte (TTFB) in milliseconds, also
called call
+ setup latency of a new read request. The value is a positive
integer.
+ transfer_bandwidth_mib_per_sec : int
+ Data transfer Bandwidth (BW) in MiB/sec (per connection). The
value is a positive
+ integer.
+ ideal_bandwidth_utilization_frac : int
Review Comment:
```suggestion
ideal_bandwidth_utilization_frac : int, default 0.9
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]