This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 30efebed3507cd24e31036f884ccce21f45e1fdc Author: umi <[email protected]> AuthorDate: Fri Sep 26 21:50:07 2025 +0800 [python] Fix OSSParam to access DLF (#6332) --- .../pypaimon/catalog/rest/rest_catalog.py | 22 +++++++++----- .../pypaimon/catalog/rest/rest_token_file_io.py | 8 ++--- paimon-python/pypaimon/common/config.py | 1 - paimon-python/pypaimon/common/file_io.py | 33 +++++++++++++++++---- .../pypaimon/tests/py36/ao_simple_test.py | 34 ++++++++++++++++++++++ 5 files changed, 80 insertions(+), 18 deletions(-) diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py b/paimon-python/pypaimon/catalog/rest/rest_catalog.py index 5e36559a7b..53db2abbaa 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py +++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py @@ -19,6 +19,9 @@ from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Union from urllib.parse import urlparse +import pyarrow +from packaging.version import parse + from pypaimon.api.api_response import GetTableResponse, PagedList from pypaimon.api.options import Options from pypaimon.api.rest_api import RESTApi @@ -200,17 +203,17 @@ class RESTCatalog(Catalog): uuid=response.get_id() ) - def file_io_from_options(self, table_path: Path) -> FileIO: - return FileIO(str(table_path), self.context.options.data) + def file_io_from_options(self, table_path: str) -> FileIO: + return FileIO(table_path, self.context.options.data) - def file_io_for_data(self, table_path: Path, identifier: Identifier): + def file_io_for_data(self, table_path: str, identifier: Identifier): return RESTTokenFileIO(identifier, table_path, self.context.options.data) \ if self.data_token_enabled else self.file_io_from_options(table_path) def load_table(self, identifier: Identifier, - internal_file_io: Callable[[Path], Any], - external_file_io: Callable[[Path], Any], + internal_file_io: Callable[[str], Any], + external_file_io: Callable[[str], Any], metadata_loader: Callable[[Identifier], TableMetadata], ) -> FileStoreTable: metadata = metadata_loader(identifier) @@ -223,9 +226,12 @@ class RESTCatalog(Catalog): supports_version_management=True # REST catalogs support version management ) path_parsed = urlparse(schema.options.get(CoreOptions.PATH)) - path = Path(path_parsed.path) if path_parsed.scheme is None else Path(schema.options.get(CoreOptions.PATH)) - table_path = path_parsed.netloc + "/" + path_parsed.path \ - if path_parsed.scheme == "file" else path_parsed.path[1:] + path = path_parsed.path if path_parsed.scheme is None else schema.options.get(CoreOptions.PATH) + if path_parsed.scheme == "file": + table_path = path_parsed.path + else: + table_path = path_parsed.netloc + path_parsed.path \ + if parse(pyarrow.__version__) >= parse("7.0.0") else path_parsed.path[1:] table = self.create(data_file_io(path), Path(table_path), schema, diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py index b9671c8ae9..a65e96695c 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py +++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py @@ -31,7 +31,7 @@ from pypaimon.common.identifier import Identifier class RESTTokenFileIO(FileIO): - def __init__(self, identifier: Identifier, path: Path, + def __init__(self, identifier: Identifier, path: str, catalog_options: Optional[dict] = None): self.identifier = identifier self.path = path @@ -39,12 +39,12 @@ class RESTTokenFileIO(FileIO): self.api_instance: Optional[RESTApi] = None self.lock = threading.Lock() self.log = logging.getLogger(__name__) - super().__init__(str(path), catalog_options) + super().__init__(path, catalog_options) - def _initialize_oss_fs(self) -> FileSystem: + def _initialize_oss_fs(self, path) -> FileSystem: self.try_to_refresh_token() self.properties.update(self.token.token) - return super()._initialize_oss_fs() + return super()._initialize_oss_fs(path) def new_output_stream(self, path: Path): return self.filesystem.open_output_stream(str(path)) diff --git a/paimon-python/pypaimon/common/config.py b/paimon-python/pypaimon/common/config.py index b3c9a673b3..0478c207bb 100644 --- a/paimon-python/pypaimon/common/config.py +++ b/paimon-python/pypaimon/common/config.py @@ -21,7 +21,6 @@ class OssOptions: OSS_SECURITY_TOKEN = "fs.oss.securityToken" OSS_ENDPOINT = "fs.oss.endpoint" OSS_REGION = "fs.oss.region" - OSS_BUCKET = "fs.oss.bucket" class S3Options: diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index 809d65338c..6f5dfc6a2a 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -31,12 +31,12 @@ from pypaimon.common.config import OssOptions, S3Options class FileIO: - def __init__(self, warehouse: str, catalog_options: dict): + def __init__(self, path: str, catalog_options: dict): self.properties = catalog_options self.logger = logging.getLogger(__name__) - scheme, netloc, path = self.parse_location(warehouse) + scheme, netloc, _ = self.parse_location(path) if scheme in {"oss"}: - self.filesystem = self._initialize_oss_fs() + self.filesystem = self._initialize_oss_fs(path) elif scheme in {"s3", "s3a", "s3n"}: self.filesystem = self._initialize_s3_fs() elif scheme in {"hdfs", "viewfs"}: @@ -56,7 +56,29 @@ class FileIO: else: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" - def _initialize_oss_fs(self) -> FileSystem: + def _extract_oss_bucket(self, location) -> str: + uri = urlparse(location) + if uri.scheme and uri.scheme != "oss": + raise ValueError("Not an OSS URI: {}".format(location)) + + netloc = uri.netloc or "" + # parse oss://access_id:secret_key@Endpoint/bucket/path/to/object + if (getattr(uri, "username", None) or getattr(uri, "password", None)) or ("@" in netloc): + first_segment = uri.path.lstrip("/").split("/", 1)[0] + if not first_segment: + raise ValueError("Invalid OSS URI without bucket: {}".format(location)) + return first_segment + + # parse oss://bucket/... or oss://bucket.endpoint/... + host = getattr(uri, "hostname", None) or netloc + if not host: + raise ValueError("Invalid OSS URI without host: {}".format(location)) + bucket = host.split(".", 1)[0] + if not bucket: + raise ValueError("Invalid OSS URI without bucket: {}".format(location)) + return bucket + + def _initialize_oss_fs(self, path) -> FileSystem: from pyarrow.fs import S3FileSystem client_kwargs = { @@ -71,7 +93,8 @@ class FileIO: client_kwargs['force_virtual_addressing'] = True client_kwargs['endpoint_override'] = self.properties.get(OssOptions.OSS_ENDPOINT) else: - client_kwargs['endpoint_override'] = (self.properties.get(OssOptions.OSS_BUCKET) + "." + + oss_bucket = self._extract_oss_bucket(path) + client_kwargs['endpoint_override'] = (oss_bucket + "." + self.properties.get(OssOptions.OSS_ENDPOINT)) return S3FileSystem(**client_kwargs) diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py b/paimon-python/pypaimon/tests/py36/ao_simple_test.py index 584ba87587..e2a61df301 100644 --- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py @@ -15,11 +15,15 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +from unittest.mock import patch + import pyarrow as pa from pypaimon import Schema from pypaimon.catalog.catalog_exception import TableNotExistException, TableAlreadyExistException, \ DatabaseNotExistException, DatabaseAlreadyExistException +from pypaimon.common.config import OssOptions +from pypaimon.common.file_io import FileIO from pypaimon.tests.py36.pyarrow_compat import table_sort_by from pypaimon.tests.rest.rest_base_test import RESTBaseTest @@ -385,3 +389,33 @@ class AOSimpleTest(RESTBaseTest): self.rest_catalog.drop_database("db1", True) except DatabaseNotExistException: self.fail("drop_database with ignore_if_exists=True should not raise DatabaseNotExistException") + + def test_initialize_oss_fs_pyarrow_lt_7(self): + props = { + OssOptions.OSS_ACCESS_KEY_ID: "AKID", + OssOptions.OSS_ACCESS_KEY_SECRET: "SECRET", + OssOptions.OSS_SECURITY_TOKEN: "TOKEN", + OssOptions.OSS_REGION: "cn-hangzhou", + OssOptions.OSS_ENDPOINT: "oss-cn-hangzhou.aliyuncs.com", + } + + with patch("pypaimon.common.file_io.pyarrow.__version__", "6.0.0"), \ + patch("pyarrow.fs.S3FileSystem") as mock_s3fs: + FileIO("oss://oss-bucket/paimon-database/paimon-table", props) + mock_s3fs.assert_called_once_with(access_key="AKID", + secret_key="SECRET", + session_token="TOKEN", + region="cn-hangzhou", + endpoint_override="oss-bucket." + props[OssOptions.OSS_ENDPOINT]) + FileIO("oss://oss-bucket.endpoint/paimon-database/paimon-table", props) + mock_s3fs.assert_called_with(access_key="AKID", + secret_key="SECRET", + session_token="TOKEN", + region="cn-hangzhou", + endpoint_override="oss-bucket." + props[OssOptions.OSS_ENDPOINT]) + FileIO("oss://access_id:secret_key@Endpoint/oss-bucket/paimon-database/paimon-table", props) + mock_s3fs.assert_called_with(access_key="AKID", + secret_key="SECRET", + session_token="TOKEN", + region="cn-hangzhou", + endpoint_override="oss-bucket." + props[OssOptions.OSS_ENDPOINT])
