This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f259ea36f7 [python] optimize file io retry for OSS and S3 (#6919)
f259ea36f7 is described below

commit f259ea36f7c49b6b83828628e43c6ba184b80cc7
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue Dec 30 10:20:16 2025 +0800

    [python] optimize file io retry for OSS and S3 (#6919)
---
 paimon-python/pypaimon/common/file_io.py | 31 +++++++++++++++++++++++++++++++
 1 file changed, 31 insertions(+)

diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index 1ba9caa791..497d711a49 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -63,6 +63,31 @@ class FileIO:
         else:
             return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
 
+    @staticmethod
+    def _create_s3_retry_config(
+            max_attempts: int = 10,
+            request_timeout: int = 60,
+            connect_timeout: int = 60
+    ) -> Dict[str, Any]:
+        """
+        AwsStandardS3RetryStrategy and timeout parameters are only available
+        in PyArrow >= 8.0.0.
+        """
+        if parse(pyarrow.__version__) >= parse("8.0.0"):
+            config = {
+                'request_timeout': request_timeout,
+                'connect_timeout': connect_timeout
+            }
+            try:
+                from pyarrow.fs import AwsStandardS3RetryStrategy
+                retry_strategy = 
AwsStandardS3RetryStrategy(max_attempts=max_attempts)
+                config['retry_strategy'] = retry_strategy
+            except ImportError:
+                pass
+            return config
+        else:
+            return {}
+
     def _extract_oss_bucket(self, location) -> str:
         uri = urlparse(location)
         if uri.scheme and uri.scheme != "oss":
@@ -104,6 +129,9 @@ class FileIO:
             client_kwargs['endpoint_override'] = (oss_bucket + "." +
                                                   
self.properties.get(OssOptions.OSS_ENDPOINT))
 
+        retry_config = self._create_s3_retry_config()
+        client_kwargs.update(retry_config)
+
         return S3FileSystem(**client_kwargs)
 
     def _initialize_s3_fs(self) -> FileSystem:
@@ -118,6 +146,9 @@ class FileIO:
             "force_virtual_addressing": True,
         }
 
+        retry_config = self._create_s3_retry_config()
+        client_kwargs.update(retry_config)
+
         return S3FileSystem(**client_kwargs)
 
     def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> 
FileSystem:

Reply via email to