This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a9a0da915 [python] Implement Python ObjectTable with read support 
(#7400)
7a9a0da915 is described below

commit 7a9a0da91555a1dcaa74bd0479cbfad67b905f9f
Author: shyjsarah <[email protected]>
AuthorDate: Wed Mar 11 14:11:00 2026 +0800

    [python] Implement Python ObjectTable with read support (#7400)
    
    This PR implements the Python version of ObjectTable, which provides
    metadata indexes for unstructured data objects (files) in a directory.
    The implementation mirrors the Java ObjectTableImpl design:
    
    ObjectTable is a read-only table with a fixed schema: path, name,
    length, mtime, atime, owner
    Scan produces exactly one split (the entire location directory)
    Read recursively enumerates all files under the location and returns
    their metadata as a PyArrow Table
    Supports projection (column selection) and limit (row count)
    Write operations (new_batch_write_builder, new_stream_write_builder)
    raise NotImplementedError
    
    New files:
    - paimon-python/pypaimon/table/object/object_table.py — ObjectTable
    class
    - paimon-python/pypaimon/table/object/object_split.py — ObjectSplit
    dataclass
    - paimon-python/pypaimon/table/object/object_read_builder.py —
    ObjectReadBuilder
    - paimon-python/pypaimon/table/object/object_table_scan.py —
    ObjectTableScan
    - paimon-python/pypaimon/table/object/object_table_read.py —
    ObjectTableRead
    - paimon-python/pypaimon/table/object/__init__.py — module exports
    
    Modified files:
    - paimon-python/pypaimon/catalog/rest/rest_catalog.py — added
    object-table type routing, _create_object_table() factory method, and
    ObjectTable to drop_partitions type check
    - paimon-python/pypaimon/schema/table_schema.py — fixed
    TableSchema.from_schema to handle empty fields list (needed for
    ObjectTable's empty schema)
---
 .../pypaimon/catalog/rest/rest_catalog.py          |  23 +-
 paimon-python/pypaimon/filesystem/pvfs.py          |  11 +-
 paimon-python/pypaimon/schema/table_schema.py      |   2 +-
 paimon-python/pypaimon/table/object/__init__.py    |  29 +++
 .../pypaimon/table/object/object_read_builder.py   |  50 +++++
 .../pypaimon/table/object/object_split.py          |  31 +++
 .../pypaimon/table/object/object_table.py          | 100 +++++++++
 .../pypaimon/table/object/object_table_read.py     | 159 +++++++++++++
 .../pypaimon/table/object/object_table_scan.py     |  37 +++
 .../pypaimon/tests/rest/rest_object_table_test.py  | 250 +++++++++++++++++++++
 10 files changed, 689 insertions(+), 3 deletions(-)

diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 943d88840e..bfb936d9a7 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -44,9 +44,11 @@ from pypaimon.snapshot.snapshot_commit import 
PartitionStatistics
 from pypaimon.table.file_store_table import FileStoreTable
 from pypaimon.table.format.format_table import FormatTable, Format
 from pypaimon.table.iceberg.iceberg_table import IcebergTable
+from pypaimon.table.object.object_table import ObjectTable
 
 FORMAT_TABLE_TYPE = "format-table"
 ICEBERG_TABLE_TYPE = "iceberg-table"
+OBJECT_TABLE_TYPE = "object-table"
 
 
 class RESTCatalog(Catalog):
@@ -253,7 +255,7 @@ class RESTCatalog(Catalog):
         if not partitions:
             raise ValueError("Partitions list cannot be empty.")
         table = self.get_table(identifier)
-        if isinstance(table, (FormatTable, IcebergTable)):
+        if isinstance(table, (FormatTable, IcebergTable, ObjectTable)):
             unsupported_type = type(table).__name__
             raise ValueError(
                 f"drop_partitions is not supported for table type 
'{unsupported_type}'. "
@@ -355,6 +357,8 @@ class RESTCatalog(Catalog):
         data_file_io = external_file_io if metadata.is_external else 
internal_file_io
         if table_type == ICEBERG_TABLE_TYPE:
             return self._create_iceberg_table(identifier, metadata, 
data_file_io)
+        if table_type == OBJECT_TABLE_TYPE:
+            return self._create_object_table(identifier, metadata, 
data_file_io)
         catalog_env = CatalogEnvironment(
             identifier=identifier,
             uuid=metadata.uuid,
@@ -411,6 +415,23 @@ class RESTCatalog(Catalog):
             uuid=metadata.uuid,
         )
 
+    def _create_object_table(self,
+                             identifier: Identifier,
+                             metadata: TableMetadata,
+                             file_io: Callable[[str], Any],
+                             ) -> ObjectTable:
+        schema = metadata.schema
+        location = schema.options.get(CoreOptions.PATH.key())
+        file_io = file_io(location)
+        return ObjectTable(
+            file_io=file_io,
+            identifier=identifier,
+            table_schema=schema,
+            location=location,
+            options=dict(schema.options),
+            comment=schema.comment,
+        )
+
     @staticmethod
     def create(file_io: FileIO,
                table_path: str,
diff --git a/paimon-python/pypaimon/filesystem/pvfs.py 
b/paimon-python/pypaimon/filesystem/pvfs.py
index a52a2972d2..be7e744b68 100644
--- a/paimon-python/pypaimon/filesystem/pvfs.py
+++ b/paimon-python/pypaimon/filesystem/pvfs.py
@@ -829,7 +829,8 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 rest_api = self.__rest_api(pvfs_table_identifier)
                 load_token_response: GetTableTokenResponse = 
rest_api.load_table_token(
                     Identifier.create(pvfs_table_identifier.database, 
pvfs_table_identifier.table))
-                fs = self._get_oss_filesystem(load_token_response.token)
+                merged_token = 
self._merge_token_with_catalog_options(load_token_response.token)
+                fs = self._get_oss_filesystem(Options(merged_token))
                 paimon_real_storage = PaimonRealStorage(
                     token=load_token_response.token,
                     expires_at_millis=load_token_response.expires_at_millis,
@@ -844,6 +845,14 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
         finally:
             write_lock.release()
 
+    def _merge_token_with_catalog_options(self, token: dict) -> dict:
+        """Merge token with catalog options, DLF OSS endpoint should override 
the standard OSS endpoint."""
+        merged_token = dict(token)
+        dlf_oss_endpoint = self.options.get(CatalogOptions.DLF_OSS_ENDPOINT)
+        if dlf_oss_endpoint and dlf_oss_endpoint.strip():
+            merged_token[OssOptions.OSS_ENDPOINT.key()] = dlf_oss_endpoint
+        return merged_token
+
     @staticmethod
     def _get_storage_type(path: str):
         if path.startswith("{}:/".format(StorageType.LOCAL.value)):
diff --git a/paimon-python/pypaimon/schema/table_schema.py 
b/paimon-python/pypaimon/schema/table_schema.py
index 5a26260651..53ddccfefc 100644
--- a/paimon-python/pypaimon/schema/table_schema.py
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -79,7 +79,7 @@ class TableSchema:
         partition_keys: List[str] = schema.partition_keys
         primary_keys: List[str] = schema.primary_keys
         options: Dict[str, str] = schema.options
-        highest_field_id: int = max(field.id for field in fields)
+        highest_field_id: int = max((field.id for field in fields), default=0)
 
         return TableSchema(
             TableSchema.CURRENT_VERSION,
diff --git a/paimon-python/pypaimon/table/object/__init__.py 
b/paimon-python/pypaimon/table/object/__init__.py
new file mode 100644
index 0000000000..3fcbe35ae5
--- /dev/null
+++ b/paimon-python/pypaimon/table/object/__init__.py
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pypaimon.table.object.object_table import ObjectTable
+from pypaimon.table.object.object_split import ObjectSplit
+from pypaimon.table.object.object_read_builder import ObjectReadBuilder
+from pypaimon.table.object.object_table_scan import ObjectTableScan
+from pypaimon.table.object.object_table_read import ObjectTableRead
+
+__all__ = [
+    "ObjectTable",
+    "ObjectSplit",
+    "ObjectReadBuilder",
+    "ObjectTableScan",
+    "ObjectTableRead",
+]
diff --git a/paimon-python/pypaimon/table/object/object_read_builder.py 
b/paimon-python/pypaimon/table/object/object_read_builder.py
new file mode 100644
index 0000000000..89cdd4a0a6
--- /dev/null
+++ b/paimon-python/pypaimon/table/object/object_read_builder.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List, Optional
+
+
+class ObjectReadBuilder:
+    """Read builder for ObjectTable.
+
+    Supports projection (column selection) and limit (row count).
+    """
+
+    def __init__(self, table):
+        from pypaimon.table.object.object_table import ObjectTable
+        self.table: ObjectTable = table
+        self._projection: Optional[List[str]] = None
+        self._limit: Optional[int] = None
+
+    def with_projection(self, projection: List[str]) -> "ObjectReadBuilder":
+        self._projection = projection
+        return self
+
+    def with_limit(self, limit: int) -> "ObjectReadBuilder":
+        self._limit = limit
+        return self
+
+    def new_scan(self):
+        from pypaimon.table.object.object_table_scan import ObjectTableScan
+        return ObjectTableScan(self.table)
+
+    def new_read(self):
+        from pypaimon.table.object.object_table_read import ObjectTableRead
+        return ObjectTableRead(
+            table=self.table,
+            projection=self._projection,
+            limit=self._limit,
+        )
diff --git a/paimon-python/pypaimon/table/object/object_split.py 
b/paimon-python/pypaimon/table/object/object_split.py
new file mode 100644
index 0000000000..2dd660626e
--- /dev/null
+++ b/paimon-python/pypaimon/table/object/object_split.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from dataclasses import dataclass
+
+from pypaimon.common.file_io import FileIO
+
+
+@dataclass(frozen=True)
+class ObjectSplit:
+    """A single split for ObjectTable representing the entire location 
directory.
+
+    ObjectTable always produces exactly one split containing the file_io
+    and location. The actual file enumeration happens during the read phase.
+    """
+
+    file_io: FileIO
+    location: str
diff --git a/paimon-python/pypaimon/table/object/object_table.py 
b/paimon-python/pypaimon/table/object/object_table.py
new file mode 100644
index 0000000000..592835da4f
--- /dev/null
+++ b/paimon-python/pypaimon/table/object/object_table.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Dict, List, Optional
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.identifier import Identifier
+from pypaimon.schema.table_schema import TableSchema
+from pypaimon.table.table import Table
+
+
+# Fixed schema matching Java ObjectTable.SCHEMA
+OBJECT_TABLE_FIELD_NAMES = ["path", "name", "length", "mtime", "atime", 
"owner"]
+
+
+class ObjectTable(Table):
+    """An object table refers to a directory that contains multiple objects 
(files).
+
+    Object table provides metadata indexes for unstructured data objects in 
this
+    directory, allowing users to analyze unstructured data in Object Storage.
+
+    This is a read-only table. Write operations are not supported.
+    """
+
+    def __init__(
+        self,
+        file_io: FileIO,
+        identifier: Identifier,
+        table_schema: TableSchema,
+        location: str,
+        options: Optional[Dict[str, str]] = None,
+        comment: Optional[str] = None,
+    ):
+        self.file_io = file_io
+        self.identifier = identifier
+        self._table_schema = table_schema
+        self._location = location.rstrip("/")
+        self._options = options or dict(table_schema.options)
+        self.comment = comment
+        self.partition_keys: List[str] = []
+        self.primary_keys: List[str] = []
+
+    def name(self) -> str:
+        return self.identifier.get_table_name()
+
+    def full_name(self) -> str:
+        return self.identifier.get_full_name()
+
+    @property
+    def table_schema(self) -> TableSchema:
+        return self._table_schema
+
+    @table_schema.setter
+    def table_schema(self, value: TableSchema):
+        self._table_schema = value
+
+    def location(self) -> str:
+        return self._location
+
+    def options(self) -> Dict[str, str]:
+        return self._options
+
+    def copy(self, dynamic_options: Dict[str, str]) -> "ObjectTable":
+        new_options = dict(self._options)
+        new_options.update(dynamic_options or {})
+        return ObjectTable(
+            file_io=self.file_io,
+            identifier=self.identifier,
+            table_schema=self._table_schema,
+            location=self._location,
+            options=new_options,
+            comment=self.comment,
+        )
+
+    def new_read_builder(self):
+        from pypaimon.table.object.object_read_builder import ObjectReadBuilder
+        return ObjectReadBuilder(self)
+
+    def new_batch_write_builder(self):
+        raise NotImplementedError(
+            "ObjectTable is read-only and does not support batch write."
+        )
+
+    def new_stream_write_builder(self):
+        raise NotImplementedError(
+            "ObjectTable is read-only and does not support stream write."
+        )
diff --git a/paimon-python/pypaimon/table/object/object_table_read.py 
b/paimon-python/pypaimon/table/object/object_table_read.py
new file mode 100644
index 0000000000..382e764fca
--- /dev/null
+++ b/paimon-python/pypaimon/table/object/object_table_read.py
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List, Optional
+from urllib.parse import urlparse
+
+import pandas
+import pyarrow
+import pyarrow.fs as pafs
+
+from pypaimon.table.object.object_split import ObjectSplit
+
+# PyArrow schema matching Java ObjectTable.SCHEMA
+OBJECT_TABLE_ARROW_SCHEMA = pyarrow.schema([
+    pyarrow.field("path", pyarrow.string(), nullable=False),
+    pyarrow.field("name", pyarrow.string(), nullable=False),
+    pyarrow.field("length", pyarrow.int64(), nullable=False),
+    pyarrow.field("mtime", pyarrow.int64(), nullable=False),
+    pyarrow.field("atime", pyarrow.int64(), nullable=False),
+    pyarrow.field("owner", pyarrow.string(), nullable=True),
+])
+
+
+def _normalize_path(path_str: str) -> str:
+    """Extract the filesystem path from a URI or plain path."""
+    parsed = urlparse(path_str)
+    if parsed.scheme:
+        return parsed.path
+    return path_str
+
+
+def _list_files_recursive(file_io, directory: str) -> list:
+    """Recursively list all files under a directory using FileIO.
+
+    Returns a list of (file_path, file_info) tuples.
+    """
+    results = []
+    try:
+        infos = file_io.list_status(directory)
+    except Exception:
+        return results
+
+    directory_rstrip = directory.rstrip("/")
+    for info in infos:
+        file_name = info.path.split("/")[-1] if "/" in info.path else info.path
+        full_path = info.path
+        if not full_path.startswith("/") and "://" not in full_path:
+            full_path = "{}/{}".format(directory_rstrip, file_name)
+
+        if info.type == pafs.FileType.Directory:
+            sub_results = _list_files_recursive(file_io, full_path)
+            results.extend(sub_results)
+        elif info.type == pafs.FileType.File:
+            results.append((full_path, info))
+
+    return results
+
+
+def _compute_relative_path(prefix: str, file_path: str) -> str:
+    """Compute relative path by removing the location prefix.
+
+    Matches Java ObjectTableImpl.toRow() logic.
+    """
+    norm_prefix = _normalize_path(prefix).rstrip("/")
+    norm_file = _normalize_path(file_path)
+
+    if norm_file.startswith(norm_prefix):
+        relative = norm_file[len(norm_prefix):]
+        if relative.startswith("/"):
+            relative = relative[1:]
+        return relative
+
+    return norm_file.split("/")[-1] if "/" in norm_file else norm_file
+
+
+class ObjectTableRead:
+    """Read implementation for ObjectTable.
+
+    Recursively lists all files under the location directory and returns
+    their metadata (path, name, length, mtime, atime, owner) as a PyArrow 
Table.
+    """
+
+    def __init__(
+        self,
+        table,
+        projection: Optional[List[str]] = None,
+        limit: Optional[int] = None,
+    ):
+        self.table = table
+        self.projection = projection
+        self.limit = limit
+
+    def to_arrow(self, splits: List[ObjectSplit]) -> pyarrow.Table:
+        paths = []
+        names = []
+        lengths = []
+        mtimes = []
+        atimes = []
+        owners = []
+
+        for split in splits:
+            file_entries = _list_files_recursive(split.file_io, split.location)
+            for file_path, file_info in file_entries:
+                if self.limit is not None and len(paths) >= self.limit:
+                    break
+
+                relative_path = _compute_relative_path(split.location, 
file_path)
+                file_name = file_path.split("/")[-1] if "/" in file_path else 
file_path
+                file_size = getattr(file_info, "size", 0) or 0
+                modification_time = getattr(file_info, "mtime_ns", 0) or 0
+                if modification_time > 0:
+                    modification_time = modification_time // 1_000_000
+                access_time = 0
+                owner = None
+
+                paths.append(relative_path)
+                names.append(file_name)
+                lengths.append(file_size)
+                mtimes.append(modification_time)
+                atimes.append(access_time)
+                owners.append(owner)
+
+            if self.limit is not None and len(paths) >= self.limit:
+                break
+
+        result = pyarrow.table(
+            {
+                "path": pyarrow.array(paths, type=pyarrow.string()),
+                "name": pyarrow.array(names, type=pyarrow.string()),
+                "length": pyarrow.array(lengths, type=pyarrow.int64()),
+                "mtime": pyarrow.array(mtimes, type=pyarrow.int64()),
+                "atime": pyarrow.array(atimes, type=pyarrow.int64()),
+                "owner": pyarrow.array(owners, type=pyarrow.string()),
+            },
+            schema=OBJECT_TABLE_ARROW_SCHEMA,
+        )
+
+        if self.projection:
+            existing = [c for c in self.projection if c in result.column_names]
+            if existing:
+                result = result.select(existing)
+
+        return result
+
+    def to_pandas(self, splits: List[ObjectSplit]) -> pandas.DataFrame:
+        return self.to_arrow(splits).to_pandas()
diff --git a/paimon-python/pypaimon/table/object/object_table_scan.py 
b/paimon-python/pypaimon/table/object/object_table_scan.py
new file mode 100644
index 0000000000..0e89ea87dd
--- /dev/null
+++ b/paimon-python/pypaimon/table/object/object_table_scan.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pypaimon.read.plan import Plan
+from pypaimon.table.object.object_split import ObjectSplit
+
+
+class ObjectTableScan:
+    """Scan for ObjectTable.
+
+    Always produces exactly one ObjectSplit containing the file_io and 
location.
+    File enumeration happens during the read phase, not the scan phase.
+    """
+
+    def __init__(self, table):
+        from pypaimon.table.object.object_table import ObjectTable
+        self.table: ObjectTable = table
+
+    def plan(self) -> Plan:
+        split = ObjectSplit(
+            file_io=self.table.file_io,
+            location=self.table.location(),
+        )
+        return Plan(_splits=[split])
diff --git a/paimon-python/pypaimon/tests/rest/rest_object_table_test.py 
b/paimon-python/pypaimon/tests/rest/rest_object_table_test.py
new file mode 100644
index 0000000000..c7d30e1e7e
--- /dev/null
+++ b/paimon-python/pypaimon/tests/rest/rest_object_table_test.py
@@ -0,0 +1,250 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import pyarrow as pa
+
+from pypaimon import PaimonVirtualFileSystem, Schema
+from pypaimon.table.object import ObjectTable
+from pypaimon.tests.rest.rest_base_test import RESTBaseTest
+
+
+class RESTObjectTableTest(RESTBaseTest):
+
+    def setUp(self):
+        super().setUp()
+        pvfs_options = {
+            'uri': self.options['uri'],
+            'warehouse': self.options['warehouse'],
+            'dlf.region': self.options['dlf.region'],
+            'token.provider': self.options['token.provider'],
+            'token': self.options['token'],
+        }
+        self.pvfs = PaimonVirtualFileSystem(pvfs_options)
+
+    def _create_object_table(self, table_name, extra_options=None):
+        """Helper to create an object table with the given name.
+
+        ObjectTable has a fixed schema and does not support custom fields.
+        Only options (including type=object-table) are needed.
+        """
+        options = {"type": "object-table"}
+        if extra_options:
+            options.update(extra_options)
+        schema = Schema(options=options)
+        self.rest_catalog.drop_table(table_name, True)
+        self.rest_catalog.create_table(table_name, schema, False)
+        return self.rest_catalog.get_table(table_name)
+
+    def _pvfs_table_path(self, table_name, sub_path=None):
+        """Build a pvfs:// path for the given table and optional sub-path."""
+        warehouse = self.options['warehouse']
+        base = "pvfs://{}/{}".format(warehouse, table_name.replace('.', '/'))
+        if sub_path:
+            return "{}/{}".format(base, sub_path)
+        return base
+
+    def _write_file_via_pvfs(self, table_name, filename, content):
+        """Write a file into the table's location using pvfs."""
+        path = self._pvfs_table_path(table_name, filename)
+        # Ensure parent directory exists when filename contains subdirectories
+        if '/' in filename:
+            parent_dir = self._pvfs_table_path(
+                table_name, filename.rsplit('/', 1)[0]
+            )
+            self.pvfs.makedirs(parent_dir, exist_ok=True)
+        with self.pvfs.open(path, 'wb') as f:
+            f.write(content)
+
+    def test_get_object_table(self):
+        table_name = "default.object_table_basic"
+        table = self._create_object_table(table_name)
+
+        self.assertIsInstance(table, ObjectTable)
+        self.assertEqual(table.name(), "object_table_basic")
+        self.assertEqual(table.full_name(), table_name)
+        self.assertEqual(table.partition_keys, [])
+        self.assertEqual(table.primary_keys, [])
+        self.assertEqual(table.options().get("type"), "object-table")
+
+    def test_object_table_read_files(self):
+        table_name = "default.object_table_read"
+        table = self._create_object_table(table_name)
+
+        # Write some test files into the table's location via pvfs
+        test_files = {
+            "file_a.txt": b"hello world",
+            "file_b.dat": b"some binary data here",
+        }
+        for filename, content in test_files.items():
+            self._write_file_via_pvfs(table_name, filename, content)
+
+        # Read the object table
+        read_builder = table.new_read_builder()
+        scan = read_builder.new_scan()
+        plan = scan.plan()
+        splits = plan.splits()
+
+        self.assertEqual(len(splits), 1)
+
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(splits)
+
+        self.assertIsInstance(result, pa.Table)
+        # Should contain the schema, snapshot and manifest files plus our test 
files
+        # Filter to only our test files
+        result_names = result.column("name").to_pylist()
+        for filename in test_files:
+            self.assertIn(filename, result_names)
+
+        # Verify file sizes for our test files
+        result_dict = {}
+        for i in range(result.num_rows):
+            name = result.column("name")[i].as_py()
+            length = result.column("length")[i].as_py()
+            result_dict[name] = length
+
+        for filename, content in test_files.items():
+            self.assertEqual(result_dict[filename], len(content))
+
+        # Verify schema columns
+        self.assertIn("path", result.column_names)
+        self.assertIn("name", result.column_names)
+        self.assertIn("length", result.column_names)
+        self.assertIn("mtime", result.column_names)
+        self.assertIn("atime", result.column_names)
+        self.assertIn("owner", result.column_names)
+
+    def test_object_table_read_with_subdirectories(self):
+        table_name = "default.object_table_subdir"
+        table = self._create_object_table(table_name)
+
+        # Write files via pvfs, including a nested subdirectory file
+        self._write_file_via_pvfs(table_name, "root_file.txt", b"root content")
+        self._write_file_via_pvfs(table_name, "subdir/nested_file.txt", 
b"nested content")
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+
+        result_names = result.column("name").to_pylist()
+        self.assertIn("root_file.txt", result_names)
+        self.assertIn("nested_file.txt", result_names)
+
+        # Verify relative paths contain subdirectory
+        result_paths = result.column("path").to_pylist()
+        nested_paths = [p for p in result_paths if "nested_file.txt" in p]
+        self.assertTrue(len(nested_paths) > 0)
+        self.assertIn("subdir/", nested_paths[0])
+
+    def test_object_table_with_projection(self):
+        table_name = "default.object_table_projection"
+        table = self._create_object_table(table_name)
+
+        self._write_file_via_pvfs(table_name, "proj_test.txt", b"test data")
+
+        # Test projection with two columns
+        read_builder = table.new_read_builder()
+        read_builder.with_projection(["name", "length"])
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+
+        self.assertEqual(result.column_names, ["name", "length"])
+        self.assertNotIn("path", result.column_names)
+        self.assertNotIn("mtime", result.column_names)
+        self.assertNotIn("atime", result.column_names)
+        self.assertNotIn("owner", result.column_names)
+
+        # Verify data content for our test file
+        result_names = result.column("name").to_pylist()
+        self.assertIn("proj_test.txt", result_names)
+        idx = result_names.index("proj_test.txt")
+        self.assertEqual(result.column("length")[idx].as_py(), len(b"test 
data"))
+
+    def test_object_table_with_limit(self):
+        table_name = "default.object_table_limit"
+        table = self._create_object_table(table_name)
+
+        for i in range(5):
+            self._write_file_via_pvfs(
+                table_name, "file_{}.txt".format(i), "content 
{}".format(i).encode()
+            )
+
+        read_builder = table.new_read_builder()
+        read_builder.with_limit(2)
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+
+        self.assertEqual(result.num_rows, 2)
+
+    def test_object_table_options_and_copy(self):
+        table_name = "default.object_table_options"
+        table = self._create_object_table(
+            table_name, extra_options={"custom.key": "custom_value"}
+        )
+
+        self.assertEqual(table.options().get("custom.key"), "custom_value")
+        self.assertEqual(table.options().get("type"), "object-table")
+
+        # Test copy with dynamic options
+        copied = table.copy({"new.key": "new_value"})
+        self.assertIsInstance(copied, ObjectTable)
+        self.assertEqual(copied.options().get("custom.key"), "custom_value")
+        self.assertEqual(copied.options().get("new.key"), "new_value")
+
+        # Original should not be modified
+        self.assertIsNone(table.options().get("new.key"))
+
+        # Test copy with override
+        overridden = table.copy({"custom.key": "overridden"})
+        self.assertEqual(overridden.options().get("custom.key"), "overridden")
+        self.assertEqual(table.options().get("custom.key"), "custom_value")
+
+    def test_object_table_unsupported_write(self):
+        table_name = "default.object_table_no_write"
+        table = self._create_object_table(table_name)
+
+        with self.assertRaises(NotImplementedError):
+            table.new_batch_write_builder()
+        with self.assertRaises(NotImplementedError):
+            table.new_stream_write_builder()
+
+    def test_object_table_unsupported_drop_partitions(self):
+        table_name = "default.object_table_no_drop_partitions"
+        self._create_object_table(table_name)
+
+        with self.assertRaisesRegex(
+            ValueError,
+            "drop_partitions is not supported for table type 'ObjectTable'",
+        ):
+            self.rest_catalog.drop_partitions(
+                table_name,
+                [{"dt": "20250101"}],
+            )
+
+    def test_object_table_to_pandas(self):
+        table_name = "default.object_table_pandas"
+        table = self._create_object_table(table_name)
+
+        self._write_file_via_pvfs(table_name, "pandas_test.txt", b"pandas 
data")
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result_df = read_builder.new_read().to_pandas(splits)
+
+        self.assertIn("name", result_df.columns)
+        self.assertIn("pandas_test.txt", result_df["name"].values)


Reply via email to