This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7a9a0da915 [python] Implement Python ObjectTable with read support
(#7400)
7a9a0da915 is described below
commit 7a9a0da91555a1dcaa74bd0479cbfad67b905f9f
Author: shyjsarah <[email protected]>
AuthorDate: Wed Mar 11 14:11:00 2026 +0800
[python] Implement Python ObjectTable with read support (#7400)
This PR implements the Python version of ObjectTable, which provides
metadata indexes for unstructured data objects (files) in a directory.
The implementation mirrors the Java ObjectTableImpl design:
ObjectTable is a read-only table with a fixed schema: path, name,
length, mtime, atime, owner
Scan produces exactly one split (the entire location directory)
Read recursively enumerates all files under the location and returns
their metadata as a PyArrow Table
Supports projection (column selection) and limit (row count)
Write operations (new_batch_write_builder, new_stream_write_builder)
raise NotImplementedError
New files:
- paimon-python/pypaimon/table/object/object_table.py — ObjectTable
class
- paimon-python/pypaimon/table/object/object_split.py — ObjectSplit
dataclass
- paimon-python/pypaimon/table/object/object_read_builder.py —
ObjectReadBuilder
- paimon-python/pypaimon/table/object/object_table_scan.py —
ObjectTableScan
- paimon-python/pypaimon/table/object/object_table_read.py —
ObjectTableRead
- paimon-python/pypaimon/table/object/__init__.py — module exports
Modified files:
- paimon-python/pypaimon/catalog/rest/rest_catalog.py — added
object-table type routing, _create_object_table() factory method, and
ObjectTable to drop_partitions type check
- paimon-python/pypaimon/schema/table_schema.py — fixed
TableSchema.from_schema to handle empty fields list (needed for
ObjectTable's empty schema)
---
.../pypaimon/catalog/rest/rest_catalog.py | 23 +-
paimon-python/pypaimon/filesystem/pvfs.py | 11 +-
paimon-python/pypaimon/schema/table_schema.py | 2 +-
paimon-python/pypaimon/table/object/__init__.py | 29 +++
.../pypaimon/table/object/object_read_builder.py | 50 +++++
.../pypaimon/table/object/object_split.py | 31 +++
.../pypaimon/table/object/object_table.py | 100 +++++++++
.../pypaimon/table/object/object_table_read.py | 159 +++++++++++++
.../pypaimon/table/object/object_table_scan.py | 37 +++
.../pypaimon/tests/rest/rest_object_table_test.py | 250 +++++++++++++++++++++
10 files changed, 689 insertions(+), 3 deletions(-)
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 943d88840e..bfb936d9a7 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -44,9 +44,11 @@ from pypaimon.snapshot.snapshot_commit import
PartitionStatistics
from pypaimon.table.file_store_table import FileStoreTable
from pypaimon.table.format.format_table import FormatTable, Format
from pypaimon.table.iceberg.iceberg_table import IcebergTable
+from pypaimon.table.object.object_table import ObjectTable
FORMAT_TABLE_TYPE = "format-table"
ICEBERG_TABLE_TYPE = "iceberg-table"
+OBJECT_TABLE_TYPE = "object-table"
class RESTCatalog(Catalog):
@@ -253,7 +255,7 @@ class RESTCatalog(Catalog):
if not partitions:
raise ValueError("Partitions list cannot be empty.")
table = self.get_table(identifier)
- if isinstance(table, (FormatTable, IcebergTable)):
+ if isinstance(table, (FormatTable, IcebergTable, ObjectTable)):
unsupported_type = type(table).__name__
raise ValueError(
f"drop_partitions is not supported for table type
'{unsupported_type}'. "
@@ -355,6 +357,8 @@ class RESTCatalog(Catalog):
data_file_io = external_file_io if metadata.is_external else
internal_file_io
if table_type == ICEBERG_TABLE_TYPE:
return self._create_iceberg_table(identifier, metadata,
data_file_io)
+ if table_type == OBJECT_TABLE_TYPE:
+ return self._create_object_table(identifier, metadata,
data_file_io)
catalog_env = CatalogEnvironment(
identifier=identifier,
uuid=metadata.uuid,
@@ -411,6 +415,23 @@ class RESTCatalog(Catalog):
uuid=metadata.uuid,
)
+ def _create_object_table(self,
+ identifier: Identifier,
+ metadata: TableMetadata,
+ file_io: Callable[[str], Any],
+ ) -> ObjectTable:
+ schema = metadata.schema
+ location = schema.options.get(CoreOptions.PATH.key())
+ file_io = file_io(location)
+ return ObjectTable(
+ file_io=file_io,
+ identifier=identifier,
+ table_schema=schema,
+ location=location,
+ options=dict(schema.options),
+ comment=schema.comment,
+ )
+
@staticmethod
def create(file_io: FileIO,
table_path: str,
diff --git a/paimon-python/pypaimon/filesystem/pvfs.py
b/paimon-python/pypaimon/filesystem/pvfs.py
index a52a2972d2..be7e744b68 100644
--- a/paimon-python/pypaimon/filesystem/pvfs.py
+++ b/paimon-python/pypaimon/filesystem/pvfs.py
@@ -829,7 +829,8 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
rest_api = self.__rest_api(pvfs_table_identifier)
load_token_response: GetTableTokenResponse =
rest_api.load_table_token(
Identifier.create(pvfs_table_identifier.database,
pvfs_table_identifier.table))
- fs = self._get_oss_filesystem(load_token_response.token)
+ merged_token =
self._merge_token_with_catalog_options(load_token_response.token)
+ fs = self._get_oss_filesystem(Options(merged_token))
paimon_real_storage = PaimonRealStorage(
token=load_token_response.token,
expires_at_millis=load_token_response.expires_at_millis,
@@ -844,6 +845,14 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
finally:
write_lock.release()
+ def _merge_token_with_catalog_options(self, token: dict) -> dict:
+ """Merge token with catalog options, DLF OSS endpoint should override
the standard OSS endpoint."""
+ merged_token = dict(token)
+ dlf_oss_endpoint = self.options.get(CatalogOptions.DLF_OSS_ENDPOINT)
+ if dlf_oss_endpoint and dlf_oss_endpoint.strip():
+ merged_token[OssOptions.OSS_ENDPOINT.key()] = dlf_oss_endpoint
+ return merged_token
+
@staticmethod
def _get_storage_type(path: str):
if path.startswith("{}:/".format(StorageType.LOCAL.value)):
diff --git a/paimon-python/pypaimon/schema/table_schema.py
b/paimon-python/pypaimon/schema/table_schema.py
index 5a26260651..53ddccfefc 100644
--- a/paimon-python/pypaimon/schema/table_schema.py
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -79,7 +79,7 @@ class TableSchema:
partition_keys: List[str] = schema.partition_keys
primary_keys: List[str] = schema.primary_keys
options: Dict[str, str] = schema.options
- highest_field_id: int = max(field.id for field in fields)
+ highest_field_id: int = max((field.id for field in fields), default=0)
return TableSchema(
TableSchema.CURRENT_VERSION,
diff --git a/paimon-python/pypaimon/table/object/__init__.py
b/paimon-python/pypaimon/table/object/__init__.py
new file mode 100644
index 0000000000..3fcbe35ae5
--- /dev/null
+++ b/paimon-python/pypaimon/table/object/__init__.py
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pypaimon.table.object.object_table import ObjectTable
+from pypaimon.table.object.object_split import ObjectSplit
+from pypaimon.table.object.object_read_builder import ObjectReadBuilder
+from pypaimon.table.object.object_table_scan import ObjectTableScan
+from pypaimon.table.object.object_table_read import ObjectTableRead
+
+__all__ = [
+ "ObjectTable",
+ "ObjectSplit",
+ "ObjectReadBuilder",
+ "ObjectTableScan",
+ "ObjectTableRead",
+]
diff --git a/paimon-python/pypaimon/table/object/object_read_builder.py
b/paimon-python/pypaimon/table/object/object_read_builder.py
new file mode 100644
index 0000000000..89cdd4a0a6
--- /dev/null
+++ b/paimon-python/pypaimon/table/object/object_read_builder.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List, Optional
+
+
+class ObjectReadBuilder:
+ """Read builder for ObjectTable.
+
+ Supports projection (column selection) and limit (row count).
+ """
+
+ def __init__(self, table):
+ from pypaimon.table.object.object_table import ObjectTable
+ self.table: ObjectTable = table
+ self._projection: Optional[List[str]] = None
+ self._limit: Optional[int] = None
+
+ def with_projection(self, projection: List[str]) -> "ObjectReadBuilder":
+ self._projection = projection
+ return self
+
+ def with_limit(self, limit: int) -> "ObjectReadBuilder":
+ self._limit = limit
+ return self
+
+ def new_scan(self):
+ from pypaimon.table.object.object_table_scan import ObjectTableScan
+ return ObjectTableScan(self.table)
+
+ def new_read(self):
+ from pypaimon.table.object.object_table_read import ObjectTableRead
+ return ObjectTableRead(
+ table=self.table,
+ projection=self._projection,
+ limit=self._limit,
+ )
diff --git a/paimon-python/pypaimon/table/object/object_split.py
b/paimon-python/pypaimon/table/object/object_split.py
new file mode 100644
index 0000000000..2dd660626e
--- /dev/null
+++ b/paimon-python/pypaimon/table/object/object_split.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from dataclasses import dataclass
+
+from pypaimon.common.file_io import FileIO
+
+
+@dataclass(frozen=True)
+class ObjectSplit:
+ """A single split for ObjectTable representing the entire location
directory.
+
+ ObjectTable always produces exactly one split containing the file_io
+ and location. The actual file enumeration happens during the read phase.
+ """
+
+ file_io: FileIO
+ location: str
diff --git a/paimon-python/pypaimon/table/object/object_table.py
b/paimon-python/pypaimon/table/object/object_table.py
new file mode 100644
index 0000000000..592835da4f
--- /dev/null
+++ b/paimon-python/pypaimon/table/object/object_table.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Dict, List, Optional
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.identifier import Identifier
+from pypaimon.schema.table_schema import TableSchema
+from pypaimon.table.table import Table
+
+
+# Fixed schema matching Java ObjectTable.SCHEMA
+OBJECT_TABLE_FIELD_NAMES = ["path", "name", "length", "mtime", "atime",
"owner"]
+
+
+class ObjectTable(Table):
+ """An object table refers to a directory that contains multiple objects
(files).
+
+ Object table provides metadata indexes for unstructured data objects in
this
+ directory, allowing users to analyze unstructured data in Object Storage.
+
+ This is a read-only table. Write operations are not supported.
+ """
+
+ def __init__(
+ self,
+ file_io: FileIO,
+ identifier: Identifier,
+ table_schema: TableSchema,
+ location: str,
+ options: Optional[Dict[str, str]] = None,
+ comment: Optional[str] = None,
+ ):
+ self.file_io = file_io
+ self.identifier = identifier
+ self._table_schema = table_schema
+ self._location = location.rstrip("/")
+ self._options = options or dict(table_schema.options)
+ self.comment = comment
+ self.partition_keys: List[str] = []
+ self.primary_keys: List[str] = []
+
+ def name(self) -> str:
+ return self.identifier.get_table_name()
+
+ def full_name(self) -> str:
+ return self.identifier.get_full_name()
+
+ @property
+ def table_schema(self) -> TableSchema:
+ return self._table_schema
+
+ @table_schema.setter
+ def table_schema(self, value: TableSchema):
+ self._table_schema = value
+
+ def location(self) -> str:
+ return self._location
+
+ def options(self) -> Dict[str, str]:
+ return self._options
+
+ def copy(self, dynamic_options: Dict[str, str]) -> "ObjectTable":
+ new_options = dict(self._options)
+ new_options.update(dynamic_options or {})
+ return ObjectTable(
+ file_io=self.file_io,
+ identifier=self.identifier,
+ table_schema=self._table_schema,
+ location=self._location,
+ options=new_options,
+ comment=self.comment,
+ )
+
+ def new_read_builder(self):
+ from pypaimon.table.object.object_read_builder import ObjectReadBuilder
+ return ObjectReadBuilder(self)
+
+ def new_batch_write_builder(self):
+ raise NotImplementedError(
+ "ObjectTable is read-only and does not support batch write."
+ )
+
+ def new_stream_write_builder(self):
+ raise NotImplementedError(
+ "ObjectTable is read-only and does not support stream write."
+ )
diff --git a/paimon-python/pypaimon/table/object/object_table_read.py
b/paimon-python/pypaimon/table/object/object_table_read.py
new file mode 100644
index 0000000000..382e764fca
--- /dev/null
+++ b/paimon-python/pypaimon/table/object/object_table_read.py
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List, Optional
+from urllib.parse import urlparse
+
+import pandas
+import pyarrow
+import pyarrow.fs as pafs
+
+from pypaimon.table.object.object_split import ObjectSplit
+
+# PyArrow schema matching Java ObjectTable.SCHEMA
+OBJECT_TABLE_ARROW_SCHEMA = pyarrow.schema([
+ pyarrow.field("path", pyarrow.string(), nullable=False),
+ pyarrow.field("name", pyarrow.string(), nullable=False),
+ pyarrow.field("length", pyarrow.int64(), nullable=False),
+ pyarrow.field("mtime", pyarrow.int64(), nullable=False),
+ pyarrow.field("atime", pyarrow.int64(), nullable=False),
+ pyarrow.field("owner", pyarrow.string(), nullable=True),
+])
+
+
+def _normalize_path(path_str: str) -> str:
+ """Extract the filesystem path from a URI or plain path."""
+ parsed = urlparse(path_str)
+ if parsed.scheme:
+ return parsed.path
+ return path_str
+
+
+def _list_files_recursive(file_io, directory: str) -> list:
+ """Recursively list all files under a directory using FileIO.
+
+ Returns a list of (file_path, file_info) tuples.
+ """
+ results = []
+ try:
+ infos = file_io.list_status(directory)
+ except Exception:
+ return results
+
+ directory_rstrip = directory.rstrip("/")
+ for info in infos:
+ file_name = info.path.split("/")[-1] if "/" in info.path else info.path
+ full_path = info.path
+ if not full_path.startswith("/") and "://" not in full_path:
+ full_path = "{}/{}".format(directory_rstrip, file_name)
+
+ if info.type == pafs.FileType.Directory:
+ sub_results = _list_files_recursive(file_io, full_path)
+ results.extend(sub_results)
+ elif info.type == pafs.FileType.File:
+ results.append((full_path, info))
+
+ return results
+
+
+def _compute_relative_path(prefix: str, file_path: str) -> str:
+ """Compute relative path by removing the location prefix.
+
+ Matches Java ObjectTableImpl.toRow() logic.
+ """
+ norm_prefix = _normalize_path(prefix).rstrip("/")
+ norm_file = _normalize_path(file_path)
+
+ if norm_file.startswith(norm_prefix):
+ relative = norm_file[len(norm_prefix):]
+ if relative.startswith("/"):
+ relative = relative[1:]
+ return relative
+
+ return norm_file.split("/")[-1] if "/" in norm_file else norm_file
+
+
+class ObjectTableRead:
+ """Read implementation for ObjectTable.
+
+ Recursively lists all files under the location directory and returns
+ their metadata (path, name, length, mtime, atime, owner) as a PyArrow
Table.
+ """
+
+ def __init__(
+ self,
+ table,
+ projection: Optional[List[str]] = None,
+ limit: Optional[int] = None,
+ ):
+ self.table = table
+ self.projection = projection
+ self.limit = limit
+
+ def to_arrow(self, splits: List[ObjectSplit]) -> pyarrow.Table:
+ paths = []
+ names = []
+ lengths = []
+ mtimes = []
+ atimes = []
+ owners = []
+
+ for split in splits:
+ file_entries = _list_files_recursive(split.file_io, split.location)
+ for file_path, file_info in file_entries:
+ if self.limit is not None and len(paths) >= self.limit:
+ break
+
+ relative_path = _compute_relative_path(split.location,
file_path)
+ file_name = file_path.split("/")[-1] if "/" in file_path else
file_path
+ file_size = getattr(file_info, "size", 0) or 0
+ modification_time = getattr(file_info, "mtime_ns", 0) or 0
+ if modification_time > 0:
+ modification_time = modification_time // 1_000_000
+ access_time = 0
+ owner = None
+
+ paths.append(relative_path)
+ names.append(file_name)
+ lengths.append(file_size)
+ mtimes.append(modification_time)
+ atimes.append(access_time)
+ owners.append(owner)
+
+ if self.limit is not None and len(paths) >= self.limit:
+ break
+
+ result = pyarrow.table(
+ {
+ "path": pyarrow.array(paths, type=pyarrow.string()),
+ "name": pyarrow.array(names, type=pyarrow.string()),
+ "length": pyarrow.array(lengths, type=pyarrow.int64()),
+ "mtime": pyarrow.array(mtimes, type=pyarrow.int64()),
+ "atime": pyarrow.array(atimes, type=pyarrow.int64()),
+ "owner": pyarrow.array(owners, type=pyarrow.string()),
+ },
+ schema=OBJECT_TABLE_ARROW_SCHEMA,
+ )
+
+ if self.projection:
+ existing = [c for c in self.projection if c in result.column_names]
+ if existing:
+ result = result.select(existing)
+
+ return result
+
+ def to_pandas(self, splits: List[ObjectSplit]) -> pandas.DataFrame:
+ return self.to_arrow(splits).to_pandas()
diff --git a/paimon-python/pypaimon/table/object/object_table_scan.py
b/paimon-python/pypaimon/table/object/object_table_scan.py
new file mode 100644
index 0000000000..0e89ea87dd
--- /dev/null
+++ b/paimon-python/pypaimon/table/object/object_table_scan.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pypaimon.read.plan import Plan
+from pypaimon.table.object.object_split import ObjectSplit
+
+
+class ObjectTableScan:
+ """Scan for ObjectTable.
+
+ Always produces exactly one ObjectSplit containing the file_io and
location.
+ File enumeration happens during the read phase, not the scan phase.
+ """
+
+ def __init__(self, table):
+ from pypaimon.table.object.object_table import ObjectTable
+ self.table: ObjectTable = table
+
+ def plan(self) -> Plan:
+ split = ObjectSplit(
+ file_io=self.table.file_io,
+ location=self.table.location(),
+ )
+ return Plan(_splits=[split])
diff --git a/paimon-python/pypaimon/tests/rest/rest_object_table_test.py
b/paimon-python/pypaimon/tests/rest/rest_object_table_test.py
new file mode 100644
index 0000000000..c7d30e1e7e
--- /dev/null
+++ b/paimon-python/pypaimon/tests/rest/rest_object_table_test.py
@@ -0,0 +1,250 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import pyarrow as pa
+
+from pypaimon import PaimonVirtualFileSystem, Schema
+from pypaimon.table.object import ObjectTable
+from pypaimon.tests.rest.rest_base_test import RESTBaseTest
+
+
+class RESTObjectTableTest(RESTBaseTest):
+
+ def setUp(self):
+ super().setUp()
+ pvfs_options = {
+ 'uri': self.options['uri'],
+ 'warehouse': self.options['warehouse'],
+ 'dlf.region': self.options['dlf.region'],
+ 'token.provider': self.options['token.provider'],
+ 'token': self.options['token'],
+ }
+ self.pvfs = PaimonVirtualFileSystem(pvfs_options)
+
+ def _create_object_table(self, table_name, extra_options=None):
+ """Helper to create an object table with the given name.
+
+ ObjectTable has a fixed schema and does not support custom fields.
+ Only options (including type=object-table) are needed.
+ """
+ options = {"type": "object-table"}
+ if extra_options:
+ options.update(extra_options)
+ schema = Schema(options=options)
+ self.rest_catalog.drop_table(table_name, True)
+ self.rest_catalog.create_table(table_name, schema, False)
+ return self.rest_catalog.get_table(table_name)
+
+ def _pvfs_table_path(self, table_name, sub_path=None):
+ """Build a pvfs:// path for the given table and optional sub-path."""
+ warehouse = self.options['warehouse']
+ base = "pvfs://{}/{}".format(warehouse, table_name.replace('.', '/'))
+ if sub_path:
+ return "{}/{}".format(base, sub_path)
+ return base
+
+ def _write_file_via_pvfs(self, table_name, filename, content):
+ """Write a file into the table's location using pvfs."""
+ path = self._pvfs_table_path(table_name, filename)
+ # Ensure parent directory exists when filename contains subdirectories
+ if '/' in filename:
+ parent_dir = self._pvfs_table_path(
+ table_name, filename.rsplit('/', 1)[0]
+ )
+ self.pvfs.makedirs(parent_dir, exist_ok=True)
+ with self.pvfs.open(path, 'wb') as f:
+ f.write(content)
+
+ def test_get_object_table(self):
+ table_name = "default.object_table_basic"
+ table = self._create_object_table(table_name)
+
+ self.assertIsInstance(table, ObjectTable)
+ self.assertEqual(table.name(), "object_table_basic")
+ self.assertEqual(table.full_name(), table_name)
+ self.assertEqual(table.partition_keys, [])
+ self.assertEqual(table.primary_keys, [])
+ self.assertEqual(table.options().get("type"), "object-table")
+
+ def test_object_table_read_files(self):
+ table_name = "default.object_table_read"
+ table = self._create_object_table(table_name)
+
+ # Write some test files into the table's location via pvfs
+ test_files = {
+ "file_a.txt": b"hello world",
+ "file_b.dat": b"some binary data here",
+ }
+ for filename, content in test_files.items():
+ self._write_file_via_pvfs(table_name, filename, content)
+
+ # Read the object table
+ read_builder = table.new_read_builder()
+ scan = read_builder.new_scan()
+ plan = scan.plan()
+ splits = plan.splits()
+
+ self.assertEqual(len(splits), 1)
+
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(splits)
+
+ self.assertIsInstance(result, pa.Table)
+ # Should contain the schema, snapshot and manifest files plus our test
files
+ # Filter to only our test files
+ result_names = result.column("name").to_pylist()
+ for filename in test_files:
+ self.assertIn(filename, result_names)
+
+ # Verify file sizes for our test files
+ result_dict = {}
+ for i in range(result.num_rows):
+ name = result.column("name")[i].as_py()
+ length = result.column("length")[i].as_py()
+ result_dict[name] = length
+
+ for filename, content in test_files.items():
+ self.assertEqual(result_dict[filename], len(content))
+
+ # Verify schema columns
+ self.assertIn("path", result.column_names)
+ self.assertIn("name", result.column_names)
+ self.assertIn("length", result.column_names)
+ self.assertIn("mtime", result.column_names)
+ self.assertIn("atime", result.column_names)
+ self.assertIn("owner", result.column_names)
+
+ def test_object_table_read_with_subdirectories(self):
+ table_name = "default.object_table_subdir"
+ table = self._create_object_table(table_name)
+
+ # Write files via pvfs, including a nested subdirectory file
+ self._write_file_via_pvfs(table_name, "root_file.txt", b"root content")
+ self._write_file_via_pvfs(table_name, "subdir/nested_file.txt",
b"nested content")
+
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+
+ result_names = result.column("name").to_pylist()
+ self.assertIn("root_file.txt", result_names)
+ self.assertIn("nested_file.txt", result_names)
+
+ # Verify relative paths contain subdirectory
+ result_paths = result.column("path").to_pylist()
+ nested_paths = [p for p in result_paths if "nested_file.txt" in p]
+ self.assertTrue(len(nested_paths) > 0)
+ self.assertIn("subdir/", nested_paths[0])
+
+ def test_object_table_with_projection(self):
+ table_name = "default.object_table_projection"
+ table = self._create_object_table(table_name)
+
+ self._write_file_via_pvfs(table_name, "proj_test.txt", b"test data")
+
+ # Test projection with two columns
+ read_builder = table.new_read_builder()
+ read_builder.with_projection(["name", "length"])
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+
+ self.assertEqual(result.column_names, ["name", "length"])
+ self.assertNotIn("path", result.column_names)
+ self.assertNotIn("mtime", result.column_names)
+ self.assertNotIn("atime", result.column_names)
+ self.assertNotIn("owner", result.column_names)
+
+ # Verify data content for our test file
+ result_names = result.column("name").to_pylist()
+ self.assertIn("proj_test.txt", result_names)
+ idx = result_names.index("proj_test.txt")
+ self.assertEqual(result.column("length")[idx].as_py(), len(b"test
data"))
+
+ def test_object_table_with_limit(self):
+ table_name = "default.object_table_limit"
+ table = self._create_object_table(table_name)
+
+ for i in range(5):
+ self._write_file_via_pvfs(
+ table_name, "file_{}.txt".format(i), "content
{}".format(i).encode()
+ )
+
+ read_builder = table.new_read_builder()
+ read_builder.with_limit(2)
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+
+ self.assertEqual(result.num_rows, 2)
+
+ def test_object_table_options_and_copy(self):
+ table_name = "default.object_table_options"
+ table = self._create_object_table(
+ table_name, extra_options={"custom.key": "custom_value"}
+ )
+
+ self.assertEqual(table.options().get("custom.key"), "custom_value")
+ self.assertEqual(table.options().get("type"), "object-table")
+
+ # Test copy with dynamic options
+ copied = table.copy({"new.key": "new_value"})
+ self.assertIsInstance(copied, ObjectTable)
+ self.assertEqual(copied.options().get("custom.key"), "custom_value")
+ self.assertEqual(copied.options().get("new.key"), "new_value")
+
+ # Original should not be modified
+ self.assertIsNone(table.options().get("new.key"))
+
+ # Test copy with override
+ overridden = table.copy({"custom.key": "overridden"})
+ self.assertEqual(overridden.options().get("custom.key"), "overridden")
+ self.assertEqual(table.options().get("custom.key"), "custom_value")
+
+ def test_object_table_unsupported_write(self):
+ table_name = "default.object_table_no_write"
+ table = self._create_object_table(table_name)
+
+ with self.assertRaises(NotImplementedError):
+ table.new_batch_write_builder()
+ with self.assertRaises(NotImplementedError):
+ table.new_stream_write_builder()
+
+ def test_object_table_unsupported_drop_partitions(self):
+ table_name = "default.object_table_no_drop_partitions"
+ self._create_object_table(table_name)
+
+ with self.assertRaisesRegex(
+ ValueError,
+ "drop_partitions is not supported for table type 'ObjectTable'",
+ ):
+ self.rest_catalog.drop_partitions(
+ table_name,
+ [{"dt": "20250101"}],
+ )
+
+ def test_object_table_to_pandas(self):
+ table_name = "default.object_table_pandas"
+ table = self._create_object_table(table_name)
+
+ self._write_file_via_pvfs(table_name, "pandas_test.txt", b"pandas
data")
+
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result_df = read_builder.new_read().to_pandas(splits)
+
+ self.assertIn("name", result_df.columns)
+ self.assertIn("pandas_test.txt", result_df["name"].values)