This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new ae071bbce5 ARROW-17057: [Python] S3FileSystem has no parameter for
retry strategy (#13633)
ae071bbce5 is described below
commit ae071bbce5cb61c7553912f051d55ff5c7b45d7a
Author: Duncan MacQuarrie <[email protected]>
AuthorDate: Wed Aug 10 14:32:00 2022 +0100
ARROW-17057: [Python] S3FileSystem has no parameter for retry strategy
(#13633)
https://issues.apache.org/jira/browse/ARROW-17057
Authored-by: 3dbrows <[email protected]>
Signed-off-by: David Li <[email protected]>
---
cpp/src/arrow/filesystem/s3fs.cc | 48 +++++++++++++++++++++++++++++
cpp/src/arrow/filesystem/s3fs.h | 8 ++++-
python/pyarrow/_s3fs.pyx | 53 ++++++++++++++++++++++++++++++++-
python/pyarrow/fs.py | 5 ++--
python/pyarrow/includes/libarrow_fs.pxd | 8 +++++
python/pyarrow/tests/test_fs.py | 15 +++++++++-
6 files changed, 132 insertions(+), 5 deletions(-)
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 5f601db5e9..fb933e4d4d 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -209,6 +209,54 @@ bool S3ProxyOptions::Equals(const S3ProxyOptions& other)
const {
username == other.username && password == other.password);
}
+// -----------------------------------------------------------------------
+// AwsRetryStrategy implementation
+
+class AwsRetryStrategy : public S3RetryStrategy {
+ public:
+ explicit AwsRetryStrategy(std::shared_ptr<Aws::Client::RetryStrategy>
retry_strategy)
+ : retry_strategy_(std::move(retry_strategy)) {}
+
+ bool ShouldRetry(const AWSErrorDetail& detail, int64_t attempted_retries)
override {
+ Aws::Client::AWSError<Aws::Client::CoreErrors> error =
DetailToError(detail);
+ return retry_strategy_->ShouldRetry(
+ error, static_cast<long>(attempted_retries)); // NOLINT: runtime/int
+ }
+
+ int64_t CalculateDelayBeforeNextRetry(const AWSErrorDetail& detail,
+ int64_t attempted_retries) override {
+ Aws::Client::AWSError<Aws::Client::CoreErrors> error =
DetailToError(detail);
+ return retry_strategy_->CalculateDelayBeforeNextRetry(
+ error, static_cast<long>(attempted_retries)); // NOLINT: runtime/int
+ }
+
+ private:
+ std::shared_ptr<Aws::Client::RetryStrategy> retry_strategy_;
+ static Aws::Client::AWSError<Aws::Client::CoreErrors> DetailToError(
+ const S3RetryStrategy::AWSErrorDetail& detail) {
+ auto exception_name = ToAwsString(detail.exception_name);
+ auto message = ToAwsString(detail.message);
+ auto errors = Aws::Client::AWSError<Aws::Client::CoreErrors>(
+ static_cast<Aws::Client::CoreErrors>(detail.error_type),
exception_name, message,
+ detail.should_retry);
+ return errors;
+ }
+};
+
+std::shared_ptr<S3RetryStrategy> S3RetryStrategy::GetAwsDefaultRetryStrategy(
+ int64_t max_attempts) {
+ return std::make_shared<AwsRetryStrategy>(
+ std::make_shared<Aws::Client::DefaultRetryStrategy>(
+ static_cast<long>(max_attempts))); // NOLINT: runtime/int
+}
+
+std::shared_ptr<S3RetryStrategy> S3RetryStrategy::GetAwsStandardRetryStrategy(
+ int64_t max_attempts) {
+ return std::make_shared<AwsRetryStrategy>(
+ std::make_shared<Aws::Client::StandardRetryStrategy>(
+ static_cast<long>(max_attempts))); // NOLINT: runtime/int
+}
+
// -----------------------------------------------------------------------
// S3Options implementation
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index 3f578aedb2..3b4731883b 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -70,7 +70,7 @@ enum class S3CredentialsKind : int8_t {
};
/// Pure virtual class for describing custom S3 retry strategies
-class S3RetryStrategy {
+class ARROW_EXPORT S3RetryStrategy {
public:
virtual ~S3RetryStrategy() = default;
@@ -90,6 +90,12 @@ class S3RetryStrategy {
/// Returns the time in milliseconds the S3 client should sleep for until
retrying.
virtual int64_t CalculateDelayBeforeNextRetry(const AWSErrorDetail& error,
int64_t attempted_retries) = 0;
+ /// Returns a stock AWS Default retry strategy.
+ static std::shared_ptr<S3RetryStrategy> GetAwsDefaultRetryStrategy(
+ int64_t max_attempts);
+ /// Returns a stock AWS Standard retry strategy.
+ static std::shared_ptr<S3RetryStrategy> GetAwsStandardRetryStrategy(
+ int64_t max_attempts);
};
/// Options for the S3FileSystem implementation.
diff --git a/python/pyarrow/_s3fs.pyx b/python/pyarrow/_s3fs.pyx
index 47cb87c23d..955a7a5514 100644
--- a/python/pyarrow/_s3fs.pyx
+++ b/python/pyarrow/_s3fs.pyx
@@ -88,6 +88,44 @@ def resolve_s3_region(bucket):
return frombytes(c_region)
+class S3RetryStrategy:
+ """
+ Base class for AWS retry strategies for use with S3.
+
+ Parameters
+ ----------
+ max_attempts : int, default 3
+ The maximum number of retry attempts to attempt before failing.
+ """
+
+ def __init__(self, max_attempts=3):
+ self.max_attempts = max_attempts
+
+
+class AwsStandardS3RetryStrategy(S3RetryStrategy):
+ """
+ Represents an AWS Standard retry strategy for use with S3.
+
+ Parameters
+ ----------
+ max_attempts : int, default 3
+ The maximum number of retry attempts to attempt before failing.
+ """
+ pass
+
+
+class AwsDefaultS3RetryStrategy(S3RetryStrategy):
+ """
+ Represents an AWS Default retry strategy for use with S3.
+
+ Parameters
+ ----------
+ max_attempts : int, default 3
+ The maximum number of retry attempts to attempt before failing.
+ """
+ pass
+
+
cdef class S3FileSystem(FileSystem):
"""
S3-backed FileSystem implementation
@@ -173,6 +211,9 @@ cdef class S3FileSystem(FileSystem):
allow_bucket_deletion : bool, default False
Whether to allow DeleteDir at the bucket-level. This option may also
be
passed in a URI query parameter.
+ retry_strategy : S3RetryStrategy, default
AwsStandardS3RetryStrategy(max_attempts=3)
+ The retry strategy to use with S3; fail after max_attempts. Available
+ strategies are AwsStandardS3RetryStrategy, AwsDefaultS3RetryStrategy.
Examples
--------
@@ -195,7 +236,8 @@ cdef class S3FileSystem(FileSystem):
bint background_writes=True, default_metadata=None,
role_arn=None, session_name=None, external_id=None,
load_frequency=900, proxy_options=None,
- allow_bucket_creation=False, allow_bucket_deletion=False):
+ allow_bucket_creation=False, allow_bucket_deletion=False,
+ retry_strategy: S3RetryStrategy =
AwsStandardS3RetryStrategy(max_attempts=3)):
cdef:
CS3Options options
shared_ptr[CS3FileSystem] wrapped
@@ -300,6 +342,15 @@ cdef class S3FileSystem(FileSystem):
options.allow_bucket_creation = allow_bucket_creation
options.allow_bucket_deletion = allow_bucket_deletion
+ if isinstance(retry_strategy, AwsStandardS3RetryStrategy):
+ options.retry_strategy =
CS3RetryStrategy.GetAwsStandardRetryStrategy(
+ retry_strategy.max_attempts)
+ elif isinstance(retry_strategy, AwsDefaultS3RetryStrategy):
+ options.retry_strategy =
CS3RetryStrategy.GetAwsDefaultRetryStrategy(
+ retry_strategy.max_attempts)
+ else:
+ raise ValueError(f'Invalid retry_strategy {retry_strategy!r}')
+
with nogil:
wrapped = GetResultValue(CS3FileSystem.Make(options))
diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py
index b2db818a9a..c6f44ccbb5 100644
--- a/python/pyarrow/fs.py
+++ b/python/pyarrow/fs.py
@@ -52,8 +52,9 @@ except ImportError:
try:
from pyarrow._s3fs import ( # noqa
- S3FileSystem, S3LogLevel, initialize_s3, finalize_s3,
- resolve_s3_region)
+ AwsDefaultS3RetryStrategy, AwsStandardS3RetryStrategy,
+ S3FileSystem, S3LogLevel, S3RetryStrategy, finalize_s3,
+ initialize_s3, resolve_s3_region)
except ImportError:
_not_imported.append("S3FileSystem")
else:
diff --git a/python/pyarrow/includes/libarrow_fs.pxd
b/python/pyarrow/includes/libarrow_fs.pxd
index 69d5dc0ebe..7984b54f58 100644
--- a/python/pyarrow/includes/libarrow_fs.pxd
+++ b/python/pyarrow/includes/libarrow_fs.pxd
@@ -150,6 +150,13 @@ cdef extern from "arrow/filesystem/api.h" namespace
"arrow::fs" nogil:
CS3CredentialsKind_WebIdentity \
"arrow::fs::S3CredentialsKind::WebIdentity"
+ cdef cppclass CS3RetryStrategy "arrow::fs::S3RetryStrategy":
+ @staticmethod
+ shared_ptr[CS3RetryStrategy] GetAwsDefaultRetryStrategy(int64_t
max_attempts)
+
+ @staticmethod
+ shared_ptr[CS3RetryStrategy] GetAwsStandardRetryStrategy(int64_t
max_attempts)
+
cdef cppclass CS3Options "arrow::fs::S3Options":
c_string region
double connect_timeout
@@ -166,6 +173,7 @@ cdef extern from "arrow/filesystem/api.h" namespace
"arrow::fs" nogil:
int load_frequency
CS3ProxyOptions proxy_options
CS3CredentialsKind credentials_kind
+ shared_ptr[CS3RetryStrategy] retry_strategy
void ConfigureDefaultCredentials()
void ConfigureAccessKey(const c_string& access_key,
const c_string& secret_key,
diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py
index 238bcb73b6..9451144541 100644
--- a/python/pyarrow/tests/test_fs.py
+++ b/python/pyarrow/tests/test_fs.py
@@ -1093,7 +1093,9 @@ def test_gcs_options():
@pytest.mark.s3
def test_s3_options():
- from pyarrow.fs import S3FileSystem
+ from pyarrow.fs import (AwsDefaultS3RetryStrategy,
+ AwsStandardS3RetryStrategy, S3FileSystem,
+ S3RetryStrategy)
fs = S3FileSystem(access_key='access', secret_key='secret',
session_token='token', region='us-east-2',
@@ -1107,6 +1109,15 @@ def test_s3_options():
assert isinstance(fs, S3FileSystem)
assert pickle.loads(pickle.dumps(fs)) == fs
+ # Note that the retry strategy won't survive pickling for now
+ fs = S3FileSystem(
+ retry_strategy=AwsStandardS3RetryStrategy(max_attempts=5))
+ assert isinstance(fs, S3FileSystem)
+
+ fs = S3FileSystem(
+ retry_strategy=AwsDefaultS3RetryStrategy(max_attempts=5))
+ assert isinstance(fs, S3FileSystem)
+
fs2 = S3FileSystem(role_arn='role')
assert isinstance(fs2, S3FileSystem)
assert pickle.loads(pickle.dumps(fs2)) == fs2
@@ -1160,6 +1171,8 @@ def test_s3_options():
S3FileSystem(role_arn="arn", anonymous=True)
with pytest.raises(ValueError):
S3FileSystem(default_metadata=["foo", "bar"])
+ with pytest.raises(ValueError):
+ S3FileSystem(retry_strategy=S3RetryStrategy())
@pytest.mark.s3