This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 30efebed3507cd24e31036f884ccce21f45e1fdc
Author: umi <[email protected]>
AuthorDate: Fri Sep 26 21:50:07 2025 +0800

    [python] Fix OSSParam to access DLF (#6332)
---
 .../pypaimon/catalog/rest/rest_catalog.py          | 22 +++++++++-----
 .../pypaimon/catalog/rest/rest_token_file_io.py    |  8 ++---
 paimon-python/pypaimon/common/config.py            |  1 -
 paimon-python/pypaimon/common/file_io.py           | 33 +++++++++++++++++----
 .../pypaimon/tests/py36/ao_simple_test.py          | 34 ++++++++++++++++++++++
 5 files changed, 80 insertions(+), 18 deletions(-)

diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 5e36559a7b..53db2abbaa 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -19,6 +19,9 @@ from pathlib import Path
 from typing import Any, Callable, Dict, List, Optional, Union
 from urllib.parse import urlparse
 
+import pyarrow
+from packaging.version import parse
+
 from pypaimon.api.api_response import GetTableResponse, PagedList
 from pypaimon.api.options import Options
 from pypaimon.api.rest_api import RESTApi
@@ -200,17 +203,17 @@ class RESTCatalog(Catalog):
             uuid=response.get_id()
         )
 
-    def file_io_from_options(self, table_path: Path) -> FileIO:
-        return FileIO(str(table_path), self.context.options.data)
+    def file_io_from_options(self, table_path: str) -> FileIO:
+        return FileIO(table_path, self.context.options.data)
 
-    def file_io_for_data(self, table_path: Path, identifier: Identifier):
+    def file_io_for_data(self, table_path: str, identifier: Identifier):
         return RESTTokenFileIO(identifier, table_path, 
self.context.options.data) \
             if self.data_token_enabled else 
self.file_io_from_options(table_path)
 
     def load_table(self,
                    identifier: Identifier,
-                   internal_file_io: Callable[[Path], Any],
-                   external_file_io: Callable[[Path], Any],
+                   internal_file_io: Callable[[str], Any],
+                   external_file_io: Callable[[str], Any],
                    metadata_loader: Callable[[Identifier], TableMetadata],
                    ) -> FileStoreTable:
         metadata = metadata_loader(identifier)
@@ -223,9 +226,12 @@ class RESTCatalog(Catalog):
             supports_version_management=True  # REST catalogs support version 
management
         )
         path_parsed = urlparse(schema.options.get(CoreOptions.PATH))
-        path = Path(path_parsed.path) if path_parsed.scheme is None else 
Path(schema.options.get(CoreOptions.PATH))
-        table_path = path_parsed.netloc + "/" + path_parsed.path \
-            if path_parsed.scheme == "file" else path_parsed.path[1:]
+        path = path_parsed.path if path_parsed.scheme is None else 
schema.options.get(CoreOptions.PATH)
+        if path_parsed.scheme == "file":
+            table_path = path_parsed.path
+        else:
+            table_path = path_parsed.netloc + path_parsed.path \
+                if parse(pyarrow.__version__) >= parse("7.0.0") else 
path_parsed.path[1:]
         table = self.create(data_file_io(path),
                             Path(table_path),
                             schema,
diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py 
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index b9671c8ae9..a65e96695c 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -31,7 +31,7 @@ from pypaimon.common.identifier import Identifier
 
 class RESTTokenFileIO(FileIO):
 
-    def __init__(self, identifier: Identifier, path: Path,
+    def __init__(self, identifier: Identifier, path: str,
                  catalog_options: Optional[dict] = None):
         self.identifier = identifier
         self.path = path
@@ -39,12 +39,12 @@ class RESTTokenFileIO(FileIO):
         self.api_instance: Optional[RESTApi] = None
         self.lock = threading.Lock()
         self.log = logging.getLogger(__name__)
-        super().__init__(str(path), catalog_options)
+        super().__init__(path, catalog_options)
 
-    def _initialize_oss_fs(self) -> FileSystem:
+    def _initialize_oss_fs(self, path) -> FileSystem:
         self.try_to_refresh_token()
         self.properties.update(self.token.token)
-        return super()._initialize_oss_fs()
+        return super()._initialize_oss_fs(path)
 
     def new_output_stream(self, path: Path):
         return self.filesystem.open_output_stream(str(path))
diff --git a/paimon-python/pypaimon/common/config.py 
b/paimon-python/pypaimon/common/config.py
index b3c9a673b3..0478c207bb 100644
--- a/paimon-python/pypaimon/common/config.py
+++ b/paimon-python/pypaimon/common/config.py
@@ -21,7 +21,6 @@ class OssOptions:
     OSS_SECURITY_TOKEN = "fs.oss.securityToken"
     OSS_ENDPOINT = "fs.oss.endpoint"
     OSS_REGION = "fs.oss.region"
-    OSS_BUCKET = "fs.oss.bucket"
 
 
 class S3Options:
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index 809d65338c..6f5dfc6a2a 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -31,12 +31,12 @@ from pypaimon.common.config import OssOptions, S3Options
 
 
 class FileIO:
-    def __init__(self, warehouse: str, catalog_options: dict):
+    def __init__(self, path: str, catalog_options: dict):
         self.properties = catalog_options
         self.logger = logging.getLogger(__name__)
-        scheme, netloc, path = self.parse_location(warehouse)
+        scheme, netloc, _ = self.parse_location(path)
         if scheme in {"oss"}:
-            self.filesystem = self._initialize_oss_fs()
+            self.filesystem = self._initialize_oss_fs(path)
         elif scheme in {"s3", "s3a", "s3n"}:
             self.filesystem = self._initialize_s3_fs()
         elif scheme in {"hdfs", "viewfs"}:
@@ -56,7 +56,29 @@ class FileIO:
         else:
             return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
 
-    def _initialize_oss_fs(self) -> FileSystem:
+    def _extract_oss_bucket(self, location) -> str:
+        uri = urlparse(location)
+        if uri.scheme and uri.scheme != "oss":
+            raise ValueError("Not an OSS URI: {}".format(location))
+
+        netloc = uri.netloc or ""
+        # parse oss://access_id:secret_key@Endpoint/bucket/path/to/object
+        if (getattr(uri, "username", None) or getattr(uri, "password", None)) 
or ("@" in netloc):
+            first_segment = uri.path.lstrip("/").split("/", 1)[0]
+            if not first_segment:
+                raise ValueError("Invalid OSS URI without bucket: 
{}".format(location))
+            return first_segment
+
+        # parse oss://bucket/... or oss://bucket.endpoint/...
+        host = getattr(uri, "hostname", None) or netloc
+        if not host:
+            raise ValueError("Invalid OSS URI without host: 
{}".format(location))
+        bucket = host.split(".", 1)[0]
+        if not bucket:
+            raise ValueError("Invalid OSS URI without bucket: 
{}".format(location))
+        return bucket
+
+    def _initialize_oss_fs(self, path) -> FileSystem:
         from pyarrow.fs import S3FileSystem
 
         client_kwargs = {
@@ -71,7 +93,8 @@ class FileIO:
             client_kwargs['force_virtual_addressing'] = True
             client_kwargs['endpoint_override'] = 
self.properties.get(OssOptions.OSS_ENDPOINT)
         else:
-            client_kwargs['endpoint_override'] = 
(self.properties.get(OssOptions.OSS_BUCKET) + "." +
+            oss_bucket = self._extract_oss_bucket(path)
+            client_kwargs['endpoint_override'] = (oss_bucket + "." +
                                                   
self.properties.get(OssOptions.OSS_ENDPOINT))
 
         return S3FileSystem(**client_kwargs)
diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py 
b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
index 584ba87587..e2a61df301 100644
--- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
@@ -15,11 +15,15 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
+from unittest.mock import patch
+
 import pyarrow as pa
 
 from pypaimon import Schema
 from pypaimon.catalog.catalog_exception import TableNotExistException, 
TableAlreadyExistException, \
     DatabaseNotExistException, DatabaseAlreadyExistException
+from pypaimon.common.config import OssOptions
+from pypaimon.common.file_io import FileIO
 from pypaimon.tests.py36.pyarrow_compat import table_sort_by
 from pypaimon.tests.rest.rest_base_test import RESTBaseTest
 
@@ -385,3 +389,33 @@ class AOSimpleTest(RESTBaseTest):
             self.rest_catalog.drop_database("db1", True)
         except DatabaseNotExistException:
             self.fail("drop_database with ignore_if_exists=True should not 
raise DatabaseNotExistException")
+
+    def test_initialize_oss_fs_pyarrow_lt_7(self):
+        props = {
+            OssOptions.OSS_ACCESS_KEY_ID: "AKID",
+            OssOptions.OSS_ACCESS_KEY_SECRET: "SECRET",
+            OssOptions.OSS_SECURITY_TOKEN: "TOKEN",
+            OssOptions.OSS_REGION: "cn-hangzhou",
+            OssOptions.OSS_ENDPOINT: "oss-cn-hangzhou.aliyuncs.com",
+        }
+
+        with patch("pypaimon.common.file_io.pyarrow.__version__", "6.0.0"), \
+                patch("pyarrow.fs.S3FileSystem") as mock_s3fs:
+            FileIO("oss://oss-bucket/paimon-database/paimon-table", props)
+            mock_s3fs.assert_called_once_with(access_key="AKID",
+                                              secret_key="SECRET",
+                                              session_token="TOKEN",
+                                              region="cn-hangzhou",
+                                              endpoint_override="oss-bucket." 
+ props[OssOptions.OSS_ENDPOINT])
+            FileIO("oss://oss-bucket.endpoint/paimon-database/paimon-table", 
props)
+            mock_s3fs.assert_called_with(access_key="AKID",
+                                         secret_key="SECRET",
+                                         session_token="TOKEN",
+                                         region="cn-hangzhou",
+                                         endpoint_override="oss-bucket." + 
props[OssOptions.OSS_ENDPOINT])
+            
FileIO("oss://access_id:secret_key@Endpoint/oss-bucket/paimon-database/paimon-table",
 props)
+            mock_s3fs.assert_called_with(access_key="AKID",
+                                         secret_key="SECRET",
+                                         session_token="TOKEN",
+                                         region="cn-hangzhou",
+                                         endpoint_override="oss-bucket." + 
props[OssOptions.OSS_ENDPOINT])

Reply via email to