This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 3236c129d1 GH-36441: [Python] Make `CacheOptions` configurable from
Python (#36627)
3236c129d1 is described below
commit 3236c129d1cbe3f73359278d1459a3f20e5c4df0
Author: Thomas Newton <[email protected]>
AuthorDate: Thu Dec 14 14:12:17 2023 +0000
GH-36441: [Python] Make `CacheOptions` configurable from Python (#36627)
### Rationale for this change
Resolves: https://github.com/apache/arrow/issues/36441
### What changes are included in this PR?
- Add python bindings for `CacheOptions` from the C++ side.
- Allow setting `cache_options` on `ParquetFragmentScanOptions` from the
python side.
- Adjust some of the comments on `CacheOptions`
### Are these changes tested?
Yes. I added python side tests for these newly available configs similar to
other configs. I have not added an integration test that ensures setting the
configs on the python side leads to correctly using them on the C++ side.
### Are there any user-facing changes?
Yes. The are new configs available on the python side but the defaults are
unchanged. I've added/updated docstrings where relevant.
* Closes: #36441
Lead-authored-by: Thomas Newton <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
---
cpp/src/arrow/io/caching.h | 10 ++-
python/pyarrow/__init__.py | 2 +-
python/pyarrow/_dataset_parquet.pyx | 21 +++++-
python/pyarrow/_parquet.pxd | 6 +-
python/pyarrow/includes/libarrow.pxd | 16 +++++
python/pyarrow/io.pxi | 134 +++++++++++++++++++++++++++++++++++
python/pyarrow/lib.pxd | 12 ++++
python/pyarrow/tests/test_dataset.py | 28 +++++---
python/pyarrow/tests/test_io.py | 59 +++++++++++++++
9 files changed, 271 insertions(+), 17 deletions(-)
diff --git a/cpp/src/arrow/io/caching.h b/cpp/src/arrow/io/caching.h
index 9c1b8fe88b..e2b911fafd 100644
--- a/cpp/src/arrow/io/caching.h
+++ b/cpp/src/arrow/io/caching.h
@@ -42,6 +42,11 @@ struct ARROW_EXPORT CacheOptions {
/// size greater than this, they are not combined
int64_t range_size_limit;
/// \brief A lazy cache does not perform any I/O until requested.
+ /// lazy = false: request all byte ranges when PreBuffer or WillNeed is
called.
+ /// lazy = True, prefetch_limit = 0: request merged byte ranges only after
the reader
+ /// needs them.
+ /// lazy = True, prefetch_limit = k: prefetch up to k merged byte ranges
ahead of the
+ /// range that is currently being read.
bool lazy;
/// \brief The maximum number of ranges to be prefetched. This is only used
/// for lazy cache to asynchronously read some ranges after reading the
target range.
@@ -56,9 +61,10 @@ struct ARROW_EXPORT CacheOptions {
/// \brief Construct CacheOptions from network storage metrics (e.g. S3).
///
/// \param[in] time_to_first_byte_millis Seek-time or Time-To-First-Byte
(TTFB) in
- /// milliseconds, also called call setup latency of a new S3 request.
+ /// milliseconds, also called call setup latency of a new read request.
/// The value is a positive integer.
- /// \param[in] transfer_bandwidth_mib_per_sec Data transfer Bandwidth (BW)
in MiB/sec.
+ /// \param[in] transfer_bandwidth_mib_per_sec Data transfer Bandwidth (BW)
in MiB/sec
+ /// (per connection).
/// The value is a positive integer.
/// \param[in] ideal_bandwidth_utilization_frac Transfer bandwidth
utilization fraction
/// (per connection) to maximize the net data load.
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index cd66abcb44..9da94885ec 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -243,7 +243,7 @@ from pyarrow.lib import (MemoryPool, LoggingMemoryPool,
ProxyMemoryPool,
# I/O
from pyarrow.lib import (NativeFile, PythonFile,
- BufferedInputStream, BufferedOutputStream,
+ BufferedInputStream, BufferedOutputStream,
CacheOptions,
CompressedInputStream, CompressedOutputStream,
TransformInputStream, transcoding_input_stream,
FixedSizeBufferWriter,
diff --git a/python/pyarrow/_dataset_parquet.pyx
b/python/pyarrow/_dataset_parquet.pyx
index d458ac4ee7..61e051f56c 100644
--- a/python/pyarrow/_dataset_parquet.pyx
+++ b/python/pyarrow/_dataset_parquet.pyx
@@ -42,6 +42,7 @@ from pyarrow._dataset cimport (
FileWriteOptions,
Fragment,
FragmentScanOptions,
+ CacheOptions,
Partitioning,
PartitioningFactory,
WrittenFile
@@ -693,6 +694,10 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
parallel using a background I/O thread pool.
Set to False if you want to prioritize minimal memory usage
over maximum speed.
+ cache_options : pyarrow.CacheOptions, default None
+ Cache options used when pre_buffer is enabled. The default values
should
+ be good for most use cases. You may want to adjust these for example if
+ you have exceptionally high latency to the file system.
thrift_string_size_limit : int, default None
If not None, override the maximum total string size allocated
when decoding Thrift structures. The default limit should be
@@ -714,6 +719,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
def __init__(self, *, bint use_buffered_stream=False,
buffer_size=8192,
bint pre_buffer=True,
+ cache_options=None,
thrift_string_size_limit=None,
thrift_container_size_limit=None,
decryption_config=None,
@@ -723,6 +729,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
self.use_buffered_stream = use_buffered_stream
self.buffer_size = buffer_size
self.pre_buffer = pre_buffer
+ if cache_options is not None:
+ self.cache_options = cache_options
if thrift_string_size_limit is not None:
self.thrift_string_size_limit = thrift_string_size_limit
if thrift_container_size_limit is not None:
@@ -770,6 +778,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
def pre_buffer(self, bint pre_buffer):
self.arrow_reader_properties().set_pre_buffer(pre_buffer)
+ @property
+ def cache_options(self):
+ return
CacheOptions.wrap(self.arrow_reader_properties().cache_options())
+
+ @cache_options.setter
+ def cache_options(self, CacheOptions options):
+ self.arrow_reader_properties().set_cache_options(options.unwrap())
+
@property
def thrift_string_size_limit(self):
return self.reader_properties().thrift_string_size_limit()
@@ -828,11 +844,11 @@ cdef class
ParquetFragmentScanOptions(FragmentScanOptions):
bool
"""
attrs = (
- self.use_buffered_stream, self.buffer_size, self.pre_buffer,
+ self.use_buffered_stream, self.buffer_size, self.pre_buffer,
self.cache_options,
self.thrift_string_size_limit, self.thrift_container_size_limit,
self.page_checksum_verification)
other_attrs = (
- other.use_buffered_stream, other.buffer_size, other.pre_buffer,
+ other.use_buffered_stream, other.buffer_size, other.pre_buffer,
other.cache_options,
other.thrift_string_size_limit,
other.thrift_container_size_limit,
other.page_checksum_verification)
return attrs == other_attrs
@@ -849,6 +865,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
use_buffered_stream=self.use_buffered_stream,
buffer_size=self.buffer_size,
pre_buffer=self.pre_buffer,
+ cache_options=self.cache_options,
thrift_string_size_limit=self.thrift_string_size_limit,
thrift_container_size_limit=self.thrift_container_size_limit,
page_checksum_verification=self.page_checksum_verification
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 59b50ceda8..7ce747e0aa 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -21,8 +21,8 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport (CChunkedArray, CScalar, CSchema,
CStatus,
CTable, CMemoryPool, CBuffer,
- CKeyValueMetadata,
- CRandomAccessFile, COutputStream,
+ CKeyValueMetadata, CRandomAccessFile,
+ COutputStream, CCacheOptions,
TimeUnit, CRecordBatchReader)
from pyarrow.lib cimport _Weakrefable
@@ -393,6 +393,8 @@ cdef extern from "parquet/api/reader.h" namespace "parquet"
nogil:
int64_t batch_size()
void set_pre_buffer(c_bool pre_buffer)
c_bool pre_buffer() const
+ void set_cache_options(CCacheOptions options)
+ CCacheOptions cache_options() const
void set_coerce_int96_timestamp_unit(TimeUnit unit)
TimeUnit coerce_int96_timestamp_unit() const
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index b0b89f8614..403846a38f 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1347,6 +1347,22 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io"
nogil:
CStatus Write(const uint8_t* data, int64_t nbytes)
CStatus Flush()
+ cdef cppclass CCacheOptions "arrow::io::CacheOptions":
+ int64_t hole_size_limit
+ int64_t range_size_limit
+ c_bool lazy
+ int64_t prefetch_limit
+ c_bool Equals "operator==" (CCacheOptions other)
+
+ @staticmethod
+ CCacheOptions MakeFromNetworkMetrics(int64_t time_to_first_byte_millis,
+ int64_t
transfer_bandwidth_mib_per_sec,
+ double
ideal_bandwidth_utilization_frac,
+ int64_t
max_ideal_request_size_mib)
+
+ @staticmethod
+ CCacheOptions LazyDefaults()
+
cdef cppclass COutputStream" arrow::io::OutputStream"(FileInterface,
Writable):
pass
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 6f39166401..1897e76efc 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -2122,6 +2122,140 @@ cdef CCompressionType _ensure_compression(str name)
except *:
raise ValueError('Invalid value for compression: {!r}'.format(name))
+cdef class CacheOptions(_Weakrefable):
+ """
+ Cache options for a pre-buffered fragment scan.
+
+ Parameters
+ ----------
+ hole_size_limit : int, default 8KiB
+ The maximum distance in bytes between two consecutive ranges; beyond
+ this value, ranges are not combined.
+ range_size_limit : int, default 32MiB
+ The maximum size in bytes of a combined range; if combining two
+ consecutive ranges would produce a range of a size greater than this,
+ they are not combined
+ lazy : bool, default True
+ lazy = false: request all byte ranges when PreBuffer or WillNeed is
called.
+ lazy = True, prefetch_limit = 0: request merged byte ranges only after
the reader
+ needs them.
+ lazy = True, prefetch_limit = k: prefetch up to k merged byte ranges
ahead of the
+ range that is currently being read.
+ prefetch_limit : int, default 0
+ The maximum number of ranges to be prefetched. This is only used for
+ lazy cache to asynchronously read some ranges after reading the target
+ range.
+ """
+
+ def __init__(self, *, hole_size_limit=None, range_size_limit=None,
lazy=None, prefetch_limit=None):
+ self.wrapped = CCacheOptions.LazyDefaults()
+ if hole_size_limit is not None:
+ self.hole_size_limit = hole_size_limit
+ if range_size_limit is not None:
+ self.range_size_limit = range_size_limit
+ if lazy is not None:
+ self.lazy = lazy
+ if prefetch_limit is not None:
+ self.prefetch_limit = prefetch_limit
+
+ cdef void init(self, CCacheOptions options):
+ self.wrapped = options
+
+ cdef inline CCacheOptions unwrap(self):
+ return self.wrapped
+
+ @staticmethod
+ cdef wrap(CCacheOptions options):
+ self = CacheOptions()
+ self.init(options)
+ return self
+
+ @property
+ def hole_size_limit(self):
+ return self.wrapped.hole_size_limit
+
+ @hole_size_limit.setter
+ def hole_size_limit(self, hole_size_limit):
+ self.wrapped.hole_size_limit = hole_size_limit
+
+ @property
+ def range_size_limit(self):
+ return self.wrapped.range_size_limit
+
+ @range_size_limit.setter
+ def range_size_limit(self, range_size_limit):
+ self.wrapped.range_size_limit = range_size_limit
+
+ @property
+ def lazy(self):
+ return self.wrapped.lazy
+
+ @lazy.setter
+ def lazy(self, lazy):
+ self.wrapped.lazy = lazy
+
+ @property
+ def prefetch_limit(self):
+ return self.wrapped.prefetch_limit
+
+ @prefetch_limit.setter
+ def prefetch_limit(self, prefetch_limit):
+ self.wrapped.prefetch_limit = prefetch_limit
+
+ def __eq__(self, CacheOptions other):
+ try:
+ return self.unwrap().Equals(other.unwrap())
+ except TypeError:
+ return False
+
+ @staticmethod
+ def from_network_metrics(time_to_first_byte_millis,
transfer_bandwidth_mib_per_sec,
+ ideal_bandwidth_utilization_frac=0.9,
max_ideal_request_size_mib=64):
+ """
+ Create suiteable CacheOptions based on provided network metrics.
+
+ Typically this will be used with object storage solutions like Amazon
S3,
+ Google Cloud Storage and Azure Blob Storage.
+
+ Parameters
+ ----------
+ time_to_first_byte_millis : int
+ Seek-time or Time-To-First-Byte (TTFB) in milliseconds, also
called call
+ setup latency of a new read request. The value is a positive
integer.
+ transfer_bandwidth_mib_per_sec : int
+ Data transfer Bandwidth (BW) in MiB/sec (per connection). The
value is a positive
+ integer.
+ ideal_bandwidth_utilization_frac : int, default 0.9
+ Transfer bandwidth utilization fraction (per connection) to
maximize the net
+ data load. The value is a positive float less than 1.
+ max_ideal_request_size_mib : int, default 64
+ The maximum single data request size (in MiB) to maximize the net
data load.
+
+ Returns
+ -------
+ CacheOptions
+ """
+ return CacheOptions.wrap(CCacheOptions.MakeFromNetworkMetrics(
+ time_to_first_byte_millis, transfer_bandwidth_mib_per_sec,
+ ideal_bandwidth_utilization_frac, max_ideal_request_size_mib))
+
+ @staticmethod
+ @binding(True) # Required for Cython < 3
+ def _reconstruct(kwargs):
+ # __reduce__ doesn't allow passing named arguments directly to the
+ # reconstructor, hence this wrapper.
+ return CacheOptions(**kwargs)
+
+ def __reduce__(self):
+ kwargs = dict(
+ hole_size_limit=self.hole_size_limit,
+ range_size_limit=self.range_size_limit,
+ lazy=self.lazy,
+ prefetch_limit=self.prefetch_limit,
+ )
+ return CacheOptions._reconstruct, (kwargs,)
+
+
cdef class Codec(_Weakrefable):
"""
Compression codec.
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 1440ba0750..58ec34addb 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -561,6 +561,18 @@ cdef class RecordBatchReader(_Weakrefable):
SharedPtrNoGIL[CRecordBatchReader] reader
+cdef class CacheOptions(_Weakrefable):
+ cdef:
+ CCacheOptions wrapped
+
+ cdef void init(self, CCacheOptions options)
+
+ cdef inline CCacheOptions unwrap(self)
+
+ @staticmethod
+ cdef wrap(const CCacheOptions options)
+
+
cdef class Codec(_Weakrefable):
cdef:
shared_ptr[CCodec] wrapped
diff --git a/python/pyarrow/tests/test_dataset.py
b/python/pyarrow/tests/test_dataset.py
index f3c25ee8c5..a37eb1e426 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -16,17 +16,16 @@
# under the License.
import contextlib
-import os
-import posixpath
import datetime
+import os
import pathlib
+import posixpath
import sys
-import textwrap
import tempfile
+import textwrap
import threading
import time
from shutil import copytree
-
from urllib.parse import quote
import numpy as np
@@ -35,12 +34,12 @@ import pytest
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.csv
-import pyarrow.json
import pyarrow.feather
import pyarrow.fs as fs
-from pyarrow.tests.util import (change_cwd, _filesystem_uri,
- FSProtocolClass, ProxyHandler,
- _configure_s3_limited_user)
+import pyarrow.json
+from pyarrow.tests.util import (FSProtocolClass, ProxyHandler,
+ _configure_s3_limited_user, _filesystem_uri,
+ change_cwd)
try:
import pandas as pd
@@ -138,7 +137,8 @@ def mockfs():
@pytest.fixture
def open_logging_fs(monkeypatch):
- from pyarrow.fs import PyFileSystem, LocalFileSystem
+ from pyarrow.fs import LocalFileSystem, PyFileSystem
+
from .test_fs import ProxyHandler
localfs = LocalFileSystem()
@@ -791,6 +791,9 @@ def test_parquet_scan_options():
thrift_container_size_limit=987654,)
opts6 = ds.ParquetFragmentScanOptions(
page_checksum_verification=True)
+ cache_opts = pa.CacheOptions(
+ hole_size_limit=2**10, range_size_limit=8*2**10, lazy=True)
+ opts7 = ds.ParquetFragmentScanOptions(pre_buffer=True,
cache_options=cache_opts)
assert opts1.use_buffered_stream is False
assert opts1.buffer_size == 2**13
@@ -816,12 +819,17 @@ def test_parquet_scan_options():
assert opts6.page_checksum_verification is True
+ assert opts7.pre_buffer is True
+ assert opts7.cache_options == cache_opts
+ assert opts7.cache_options != opts1.cache_options
+
assert opts1 == opts1
assert opts1 != opts2
assert opts2 != opts3
assert opts3 != opts4
assert opts5 != opts1
assert opts6 != opts1
+ assert opts7 != opts1
def test_file_format_pickling(pickle_module):
@@ -2711,7 +2719,7 @@ def
test_open_dataset_from_uri_s3_fsspec(s3_example_simple):
table, path, _, _, host, port, access_key, secret_key = s3_example_simple
s3fs = pytest.importorskip("s3fs")
- from pyarrow.fs import PyFileSystem, FSSpecHandler
+ from pyarrow.fs import FSSpecHandler, PyFileSystem
fs = s3fs.S3FileSystem(
key=access_key,
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 071962af29..5a495aa80a 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -664,6 +664,65 @@ def test_allocate_buffer_resizable():
assert buf.size == 200
+def test_cache_options():
+ opts1 = pa.CacheOptions()
+ opts2 = pa.CacheOptions(hole_size_limit=1024)
+ opts3 = pa.CacheOptions(hole_size_limit=4096, range_size_limit=8192)
+ opts4 = pa.CacheOptions(hole_size_limit=4096,
+ range_size_limit=8192, prefetch_limit=5)
+ opts5 = pa.CacheOptions(hole_size_limit=4096,
+ range_size_limit=8192, lazy=False)
+ opts6 = pa.CacheOptions.from_network_metrics(time_to_first_byte_millis=100,
+
transfer_bandwidth_mib_per_sec=200,
+
ideal_bandwidth_utilization_frac=0.9,
+ max_ideal_request_size_mib=64)
+
+ assert opts1.hole_size_limit == 8192
+ assert opts1.range_size_limit == 32 * 1024 * 1024
+ assert opts1.lazy is True
+ assert opts1.prefetch_limit == 0
+
+ assert opts2.hole_size_limit == 1024
+ assert opts2.range_size_limit == 32 * 1024 * 1024
+ assert opts2.lazy is True
+ assert opts2.prefetch_limit == 0
+
+ assert opts3.hole_size_limit == 4096
+ assert opts3.range_size_limit == 8192
+ assert opts3.lazy is True
+ assert opts3.prefetch_limit == 0
+
+ assert opts4.hole_size_limit == 4096
+ assert opts4.range_size_limit == 8192
+ assert opts4.lazy is True
+ assert opts4.prefetch_limit == 5
+
+ assert opts5.hole_size_limit == 4096
+ assert opts5.range_size_limit == 8192
+ assert opts5.lazy is False
+ assert opts5.prefetch_limit == 0
+
+ assert opts6.lazy is False
+
+ assert opts1 == opts1
+ assert opts1 != opts2
+ assert opts2 != opts3
+ assert opts3 != opts4
+ assert opts4 != opts5
+ assert opts6 != opts1
+
+
+def test_cache_options_pickling(pickle_module):
+ options = [
+ pa.CacheOptions(),
+ pa.CacheOptions(hole_size_limit=4096, range_size_limit=8192,
+ lazy=True, prefetch_limit=5),
+ ]
+
+ for option in options:
+ assert pickle_module.loads(pickle_module.dumps(option)) == option
+
+
@pytest.mark.parametrize("compression", [
pytest.param(
"bz2", marks=pytest.mark.xfail(raises=pa.lib.ArrowNotImplementedError)