This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new ae071bbce5 ARROW-17057: [Python] S3FileSystem has no parameter for 
retry strategy (#13633)
ae071bbce5 is described below

commit ae071bbce5cb61c7553912f051d55ff5c7b45d7a
Author: Duncan MacQuarrie <[email protected]>
AuthorDate: Wed Aug 10 14:32:00 2022 +0100

    ARROW-17057: [Python] S3FileSystem has no parameter for retry strategy 
(#13633)
    
    https://issues.apache.org/jira/browse/ARROW-17057
    
    Authored-by: 3dbrows <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 cpp/src/arrow/filesystem/s3fs.cc        | 48 +++++++++++++++++++++++++++++
 cpp/src/arrow/filesystem/s3fs.h         |  8 ++++-
 python/pyarrow/_s3fs.pyx                | 53 ++++++++++++++++++++++++++++++++-
 python/pyarrow/fs.py                    |  5 ++--
 python/pyarrow/includes/libarrow_fs.pxd |  8 +++++
 python/pyarrow/tests/test_fs.py         | 15 +++++++++-
 6 files changed, 132 insertions(+), 5 deletions(-)

diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 5f601db5e9..fb933e4d4d 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -209,6 +209,54 @@ bool S3ProxyOptions::Equals(const S3ProxyOptions& other) 
const {
           username == other.username && password == other.password);
 }
 
+// -----------------------------------------------------------------------
+// AwsRetryStrategy implementation
+
+class AwsRetryStrategy : public S3RetryStrategy {
+ public:
+  explicit AwsRetryStrategy(std::shared_ptr<Aws::Client::RetryStrategy> 
retry_strategy)
+      : retry_strategy_(std::move(retry_strategy)) {}
+
+  bool ShouldRetry(const AWSErrorDetail& detail, int64_t attempted_retries) 
override {
+    Aws::Client::AWSError<Aws::Client::CoreErrors> error = 
DetailToError(detail);
+    return retry_strategy_->ShouldRetry(
+        error, static_cast<long>(attempted_retries));  // NOLINT: runtime/int
+  }
+
+  int64_t CalculateDelayBeforeNextRetry(const AWSErrorDetail& detail,
+                                        int64_t attempted_retries) override {
+    Aws::Client::AWSError<Aws::Client::CoreErrors> error = 
DetailToError(detail);
+    return retry_strategy_->CalculateDelayBeforeNextRetry(
+        error, static_cast<long>(attempted_retries));  // NOLINT: runtime/int
+  }
+
+ private:
+  std::shared_ptr<Aws::Client::RetryStrategy> retry_strategy_;
+  static Aws::Client::AWSError<Aws::Client::CoreErrors> DetailToError(
+      const S3RetryStrategy::AWSErrorDetail& detail) {
+    auto exception_name = ToAwsString(detail.exception_name);
+    auto message = ToAwsString(detail.message);
+    auto errors = Aws::Client::AWSError<Aws::Client::CoreErrors>(
+        static_cast<Aws::Client::CoreErrors>(detail.error_type), 
exception_name, message,
+        detail.should_retry);
+    return errors;
+  }
+};
+
+std::shared_ptr<S3RetryStrategy> S3RetryStrategy::GetAwsDefaultRetryStrategy(
+    int64_t max_attempts) {
+  return std::make_shared<AwsRetryStrategy>(
+      std::make_shared<Aws::Client::DefaultRetryStrategy>(
+          static_cast<long>(max_attempts)));  // NOLINT: runtime/int
+}
+
+std::shared_ptr<S3RetryStrategy> S3RetryStrategy::GetAwsStandardRetryStrategy(
+    int64_t max_attempts) {
+  return std::make_shared<AwsRetryStrategy>(
+      std::make_shared<Aws::Client::StandardRetryStrategy>(
+          static_cast<long>(max_attempts)));  // NOLINT: runtime/int
+}
+
 // -----------------------------------------------------------------------
 // S3Options implementation
 
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index 3f578aedb2..3b4731883b 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -70,7 +70,7 @@ enum class S3CredentialsKind : int8_t {
 };
 
 /// Pure virtual class for describing custom S3 retry strategies
-class S3RetryStrategy {
+class ARROW_EXPORT S3RetryStrategy {
  public:
   virtual ~S3RetryStrategy() = default;
 
@@ -90,6 +90,12 @@ class S3RetryStrategy {
   /// Returns the time in milliseconds the S3 client should sleep for until 
retrying.
   virtual int64_t CalculateDelayBeforeNextRetry(const AWSErrorDetail& error,
                                                 int64_t attempted_retries) = 0;
+  /// Returns a stock AWS Default retry strategy.
+  static std::shared_ptr<S3RetryStrategy> GetAwsDefaultRetryStrategy(
+      int64_t max_attempts);
+  /// Returns a stock AWS Standard retry strategy.
+  static std::shared_ptr<S3RetryStrategy> GetAwsStandardRetryStrategy(
+      int64_t max_attempts);
 };
 
 /// Options for the S3FileSystem implementation.
diff --git a/python/pyarrow/_s3fs.pyx b/python/pyarrow/_s3fs.pyx
index 47cb87c23d..955a7a5514 100644
--- a/python/pyarrow/_s3fs.pyx
+++ b/python/pyarrow/_s3fs.pyx
@@ -88,6 +88,44 @@ def resolve_s3_region(bucket):
     return frombytes(c_region)
 
 
+class S3RetryStrategy:
+    """
+    Base class for AWS retry strategies for use with S3.
+
+    Parameters
+    ----------
+    max_attempts : int, default 3
+        The maximum number of retry attempts to attempt before failing.
+    """
+
+    def __init__(self, max_attempts=3):
+        self.max_attempts = max_attempts
+
+
+class AwsStandardS3RetryStrategy(S3RetryStrategy):
+    """
+    Represents an AWS Standard retry strategy for use with S3.
+
+    Parameters
+    ----------
+    max_attempts : int, default 3
+        The maximum number of retry attempts to attempt before failing.
+    """
+    pass
+
+
+class AwsDefaultS3RetryStrategy(S3RetryStrategy):
+    """
+    Represents an AWS Default retry strategy for use with S3.
+
+    Parameters
+    ----------
+    max_attempts : int, default 3
+        The maximum number of retry attempts to attempt before failing.
+    """
+    pass
+
+
 cdef class S3FileSystem(FileSystem):
     """
     S3-backed FileSystem implementation
@@ -173,6 +211,9 @@ cdef class S3FileSystem(FileSystem):
     allow_bucket_deletion : bool, default False
         Whether to allow DeleteDir at the bucket-level. This option may also 
be 
         passed in a URI query parameter.
+    retry_strategy : S3RetryStrategy, default 
AwsStandardS3RetryStrategy(max_attempts=3)
+        The retry strategy to use with S3; fail after max_attempts. Available
+        strategies are AwsStandardS3RetryStrategy, AwsDefaultS3RetryStrategy.
 
     Examples
     --------
@@ -195,7 +236,8 @@ cdef class S3FileSystem(FileSystem):
                  bint background_writes=True, default_metadata=None,
                  role_arn=None, session_name=None, external_id=None,
                  load_frequency=900, proxy_options=None,
-                 allow_bucket_creation=False, allow_bucket_deletion=False):
+                 allow_bucket_creation=False, allow_bucket_deletion=False,
+                 retry_strategy: S3RetryStrategy = 
AwsStandardS3RetryStrategy(max_attempts=3)):
         cdef:
             CS3Options options
             shared_ptr[CS3FileSystem] wrapped
@@ -300,6 +342,15 @@ cdef class S3FileSystem(FileSystem):
         options.allow_bucket_creation = allow_bucket_creation
         options.allow_bucket_deletion = allow_bucket_deletion
 
+        if isinstance(retry_strategy, AwsStandardS3RetryStrategy):
+            options.retry_strategy = 
CS3RetryStrategy.GetAwsStandardRetryStrategy(
+                retry_strategy.max_attempts)
+        elif isinstance(retry_strategy, AwsDefaultS3RetryStrategy):
+            options.retry_strategy = 
CS3RetryStrategy.GetAwsDefaultRetryStrategy(
+                retry_strategy.max_attempts)
+        else:
+            raise ValueError(f'Invalid retry_strategy {retry_strategy!r}')
+
         with nogil:
             wrapped = GetResultValue(CS3FileSystem.Make(options))
 
diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py
index b2db818a9a..c6f44ccbb5 100644
--- a/python/pyarrow/fs.py
+++ b/python/pyarrow/fs.py
@@ -52,8 +52,9 @@ except ImportError:
 
 try:
     from pyarrow._s3fs import (  # noqa
-        S3FileSystem, S3LogLevel, initialize_s3, finalize_s3,
-        resolve_s3_region)
+        AwsDefaultS3RetryStrategy, AwsStandardS3RetryStrategy,
+        S3FileSystem, S3LogLevel, S3RetryStrategy, finalize_s3,
+        initialize_s3, resolve_s3_region)
 except ImportError:
     _not_imported.append("S3FileSystem")
 else:
diff --git a/python/pyarrow/includes/libarrow_fs.pxd 
b/python/pyarrow/includes/libarrow_fs.pxd
index 69d5dc0ebe..7984b54f58 100644
--- a/python/pyarrow/includes/libarrow_fs.pxd
+++ b/python/pyarrow/includes/libarrow_fs.pxd
@@ -150,6 +150,13 @@ cdef extern from "arrow/filesystem/api.h" namespace 
"arrow::fs" nogil:
         CS3CredentialsKind_WebIdentity \
             "arrow::fs::S3CredentialsKind::WebIdentity"
 
+    cdef cppclass CS3RetryStrategy "arrow::fs::S3RetryStrategy":
+        @staticmethod
+        shared_ptr[CS3RetryStrategy] GetAwsDefaultRetryStrategy(int64_t 
max_attempts)
+
+        @staticmethod
+        shared_ptr[CS3RetryStrategy] GetAwsStandardRetryStrategy(int64_t 
max_attempts)
+
     cdef cppclass CS3Options "arrow::fs::S3Options":
         c_string region
         double connect_timeout
@@ -166,6 +173,7 @@ cdef extern from "arrow/filesystem/api.h" namespace 
"arrow::fs" nogil:
         int load_frequency
         CS3ProxyOptions proxy_options
         CS3CredentialsKind credentials_kind
+        shared_ptr[CS3RetryStrategy] retry_strategy
         void ConfigureDefaultCredentials()
         void ConfigureAccessKey(const c_string& access_key,
                                 const c_string& secret_key,
diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py
index 238bcb73b6..9451144541 100644
--- a/python/pyarrow/tests/test_fs.py
+++ b/python/pyarrow/tests/test_fs.py
@@ -1093,7 +1093,9 @@ def test_gcs_options():
 
 @pytest.mark.s3
 def test_s3_options():
-    from pyarrow.fs import S3FileSystem
+    from pyarrow.fs import (AwsDefaultS3RetryStrategy,
+                            AwsStandardS3RetryStrategy, S3FileSystem,
+                            S3RetryStrategy)
 
     fs = S3FileSystem(access_key='access', secret_key='secret',
                       session_token='token', region='us-east-2',
@@ -1107,6 +1109,15 @@ def test_s3_options():
     assert isinstance(fs, S3FileSystem)
     assert pickle.loads(pickle.dumps(fs)) == fs
 
+    # Note that the retry strategy won't survive pickling for now
+    fs = S3FileSystem(
+        retry_strategy=AwsStandardS3RetryStrategy(max_attempts=5))
+    assert isinstance(fs, S3FileSystem)
+
+    fs = S3FileSystem(
+        retry_strategy=AwsDefaultS3RetryStrategy(max_attempts=5))
+    assert isinstance(fs, S3FileSystem)
+
     fs2 = S3FileSystem(role_arn='role')
     assert isinstance(fs2, S3FileSystem)
     assert pickle.loads(pickle.dumps(fs2)) == fs2
@@ -1160,6 +1171,8 @@ def test_s3_options():
         S3FileSystem(role_arn="arn", anonymous=True)
     with pytest.raises(ValueError):
         S3FileSystem(default_metadata=["foo", "bar"])
+    with pytest.raises(ValueError):
+        S3FileSystem(retry_strategy=S3RetryStrategy())
 
 
 @pytest.mark.s3

Reply via email to