This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c7d9de849 [python] support format table (#7154)
4c7d9de849 is described below

commit 4c7d9de849810f9746cdafed6e55a1891c41b3c4
Author: XiaoHongbo <[email protected]>
AuthorDate: Fri Jan 30 14:43:04 2026 +0800

    [python] support format table (#7154)
---
 .../pypaimon/catalog/rest/rest_catalog.py          |  35 +-
 paimon-python/pypaimon/read/push_down_utils.py     |  19 +-
 paimon-python/pypaimon/table/format/__init__.py    |  36 ++
 .../table/format/format_batch_write_builder.py     |  55 ++
 .../pypaimon/table/format/format_commit_message.py |  26 +
 .../pypaimon/table/format/format_data_split.py     |  30 +
 .../pypaimon/table/format/format_read_builder.py   |  82 +++
 .../pypaimon/table/format/format_table.py          | 100 ++++
 .../pypaimon/table/format/format_table_commit.py   |  66 +++
 .../pypaimon/table/format/format_table_read.py     | 277 ++++++++++
 .../pypaimon/table/format/format_table_scan.py     | 130 +++++
 .../pypaimon/table/format/format_table_write.py    | 253 +++++++++
 .../pypaimon/tests/rest/rest_format_table_test.py  | 613 +++++++++++++++++++++
 13 files changed, 1719 insertions(+), 3 deletions(-)

diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 41a3061fb9..425146fc68 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -42,6 +42,10 @@ from pypaimon.schema.table_schema import TableSchema
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_commit import PartitionStatistics
 from pypaimon.table.file_store_table import FileStoreTable
+from pypaimon.table.format.format_table import FormatTable, Format
+
+
+FORMAT_TABLE_TYPE = "format-table"
 
 
 class RESTCatalog(Catalog):
@@ -180,7 +184,7 @@ class RESTCatalog(Catalog):
         except ForbiddenException as e:
             raise DatabaseNoPermissionException(database_name) from e
 
-    def get_table(self, identifier: Union[str, Identifier]) -> FileStoreTable:
+    def get_table(self, identifier: Union[str, Identifier]):
         if not isinstance(identifier, Identifier):
             identifier = Identifier.from_string(identifier)
         return self.load_table(
@@ -263,9 +267,12 @@ class RESTCatalog(Catalog):
                    internal_file_io: Callable[[str], Any],
                    external_file_io: Callable[[str], Any],
                    metadata_loader: Callable[[Identifier], TableMetadata],
-                   ) -> FileStoreTable:
+                   ):
         metadata = metadata_loader(identifier)
         schema = metadata.schema
+        table_type = schema.options.get(CoreOptions.TYPE.key(), 
"").strip().lower()
+        if table_type == FORMAT_TABLE_TYPE:
+            return self._create_format_table(identifier, metadata, 
internal_file_io, external_file_io)
         data_file_io = external_file_io if metadata.is_external else 
internal_file_io
         catalog_env = CatalogEnvironment(
             identifier=identifier,
@@ -281,6 +288,30 @@ class RESTCatalog(Catalog):
                             catalog_env)
         return table
 
+    def _create_format_table(self,
+                             identifier: Identifier,
+                             metadata: TableMetadata,
+                             internal_file_io: Callable[[str], Any],
+                             external_file_io: Callable[[str], Any],
+                             ) -> FormatTable:
+        schema = metadata.schema
+        location = schema.options.get(CoreOptions.PATH.key())
+        if not location:
+            raise ValueError("Format table schema must have path option")
+        data_file_io = external_file_io if metadata.is_external else 
internal_file_io
+        file_io = data_file_io(location)
+        file_format = schema.options.get(CoreOptions.FILE_FORMAT.key(), 
"parquet")
+        fmt = Format.parse(file_format)
+        return FormatTable(
+            file_io=file_io,
+            identifier=identifier,
+            table_schema=schema,
+            location=location,
+            format=fmt,
+            options=dict(schema.options),
+            comment=schema.comment,
+        )
+
     @staticmethod
     def create(file_io: FileIO,
                table_path: str,
diff --git a/paimon-python/pypaimon/read/push_down_utils.py 
b/paimon-python/pypaimon/read/push_down_utils.py
index f812341149..7ad7e53acc 100644
--- a/paimon-python/pypaimon/read/push_down_utils.py
+++ b/paimon-python/pypaimon/read/push_down_utils.py
@@ -16,12 +16,29 @@
 # limitations under the License.
 
################################################################################
 
-from typing import Dict, List, Set
+from typing import Dict, List, Optional, Set
 
 from pypaimon.common.predicate import Predicate
 from pypaimon.common.predicate_builder import PredicateBuilder
 
 
+def extract_partition_spec_from_predicate(
+    predicate: Predicate, partition_keys: List[str]
+) -> Optional[Dict[str, str]]:
+    if not predicate or not partition_keys:
+        return None
+    parts = _split_and(predicate)
+    spec: Dict[str, str] = {}
+    for p in parts:
+        if p.method != "equal" or p.field is None or p.literals is None or 
len(p.literals) != 1:
+            continue
+        if p.field in partition_keys:
+            spec[p.field] = str(p.literals[0])
+    if set(spec.keys()) == set(partition_keys):
+        return spec
+    return None
+
+
 def trim_and_transform_predicate(input_predicate: Predicate, all_fields: 
List[str], trimmed_keys: List[str]):
     new_predicate = trim_predicate_by_fields(input_predicate, trimmed_keys)
     part_to_index = {element: idx for idx, element in enumerate(trimmed_keys)}
diff --git a/paimon-python/pypaimon/table/format/__init__.py 
b/paimon-python/pypaimon/table/format/__init__.py
new file mode 100644
index 0000000000..228f165248
--- /dev/null
+++ b/paimon-python/pypaimon/table/format/__init__.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pypaimon.table.format.format_data_split import FormatDataSplit
+from pypaimon.table.format.format_table import FormatTable, Format
+from pypaimon.table.format.format_read_builder import FormatReadBuilder
+from pypaimon.table.format.format_table_scan import FormatTableScan
+from pypaimon.table.format.format_table_read import FormatTableRead
+from pypaimon.table.format.format_batch_write_builder import 
FormatBatchWriteBuilder
+from pypaimon.table.format.format_table_write import FormatTableWrite
+from pypaimon.table.format.format_table_commit import FormatTableCommit
+
+__all__ = [
+    "FormatDataSplit",
+    "FormatTable",
+    "Format",
+    "FormatReadBuilder",
+    "FormatTableScan",
+    "FormatTableRead",
+    "FormatBatchWriteBuilder",
+    "FormatTableWrite",
+    "FormatTableCommit",
+]
diff --git a/paimon-python/pypaimon/table/format/format_batch_write_builder.py 
b/paimon-python/pypaimon/table/format/format_batch_write_builder.py
new file mode 100644
index 0000000000..31d865020a
--- /dev/null
+++ b/paimon-python/pypaimon/table/format/format_batch_write_builder.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Optional
+
+from pypaimon.table.format.format_table import FormatTable
+from pypaimon.table.format.format_table_commit import FormatTableCommit
+from pypaimon.table.format.format_table_write import FormatTableWrite
+
+
+class FormatBatchWriteBuilder:
+    def __init__(self, table: FormatTable):
+        self.table = table
+        self._overwrite = False
+        self._static_partition: Optional[dict] = None
+
+    def overwrite(self, static_partition: Optional[dict] = None) -> 
"FormatBatchWriteBuilder":
+        self._overwrite = True
+        self._validate_static_partition(static_partition)
+        self._static_partition = static_partition if static_partition is not 
None else {}
+        return self
+
+    def _validate_static_partition(self, static_partition: Optional[dict]) -> 
None:
+        if not static_partition:
+            return
+        if not self.table.partition_keys:
+            raise ValueError(
+                "Format table is not partitioned, static partition values are 
not allowed."
+            )
+        for key in static_partition:
+            if key not in self.table.partition_keys:
+                raise ValueError(f"Unknown static partition column: {key}")
+
+    def new_write(self) -> FormatTableWrite:
+        return FormatTableWrite(
+            self.table,
+            overwrite=self._overwrite,
+            static_partitions=self._static_partition,
+        )
+
+    def new_commit(self) -> FormatTableCommit:
+        return FormatTableCommit(table=self.table)
diff --git a/paimon-python/pypaimon/table/format/format_commit_message.py 
b/paimon-python/pypaimon/table/format/format_commit_message.py
new file mode 100644
index 0000000000..c9a253ef58
--- /dev/null
+++ b/paimon-python/pypaimon/table/format/format_commit_message.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from dataclasses import dataclass
+from typing import List
+
+
+@dataclass
+class FormatTableCommitMessage:
+    written_paths: List[str]
+
+    def is_empty(self) -> bool:
+        return not self.written_paths
diff --git a/paimon-python/pypaimon/table/format/format_data_split.py 
b/paimon-python/pypaimon/table/format/format_data_split.py
new file mode 100644
index 0000000000..8536a18025
--- /dev/null
+++ b/paimon-python/pypaimon/table/format/format_data_split.py
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from dataclasses import dataclass
+from typing import Dict, Optional, Any
+
+
+@dataclass(frozen=True)
+class FormatDataSplit:
+    """Split for format table: one file (or future: byte range) per split."""
+
+    file_path: str
+    file_size: int
+    partition: Optional[Dict[str, Any]] = None  # partition column name -> 
value
+
+    def data_path(self) -> str:
+        return self.file_path
diff --git a/paimon-python/pypaimon/table/format/format_read_builder.py 
b/paimon-python/pypaimon/table/format/format_read_builder.py
new file mode 100644
index 0000000000..ea501d56f0
--- /dev/null
+++ b/paimon-python/pypaimon/table/format/format_read_builder.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List, Optional
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.common.predicate_builder import PredicateBuilder
+from pypaimon.read.push_down_utils import extract_partition_spec_from_predicate
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.format.format_table import FormatTable
+from pypaimon.table.format.format_table_scan import FormatTableScan
+from pypaimon.table.format.format_table_read import FormatTableRead
+
+
+class FormatReadBuilder:
+    def __init__(self, table: FormatTable):
+        self.table = table
+        self._projection: Optional[List[str]] = None
+        self._limit: Optional[int] = None
+        self._partition_filter: Optional[dict] = None
+
+    def with_filter(self, predicate: Predicate) -> "FormatReadBuilder":
+        ok = (
+            self._partition_filter is None
+            and self.table.partition_keys
+            and predicate
+        )
+        if ok:
+            spec = extract_partition_spec_from_predicate(
+                predicate, self.table.partition_keys
+            )
+            if spec is not None:
+                self._partition_filter = spec
+        return self
+
+    def with_projection(self, projection: List[str]) -> "FormatReadBuilder":
+        self._projection = projection
+        return self
+
+    def with_limit(self, limit: int) -> "FormatReadBuilder":
+        self._limit = limit
+        return self
+
+    def with_partition_filter(
+        self, partition_spec: Optional[dict]
+    ) -> "FormatReadBuilder":
+        self._partition_filter = partition_spec
+        return self
+
+    def new_scan(self) -> FormatTableScan:
+        return FormatTableScan(
+            self.table,
+            partition_filter=self._partition_filter,
+        )
+
+    def new_read(self) -> FormatTableRead:
+        return FormatTableRead(
+            table=self.table,
+            projection=self._projection,
+            limit=self._limit,
+        )
+
+    def new_predicate_builder(self) -> PredicateBuilder:
+        return PredicateBuilder(self.read_type())
+
+    def read_type(self) -> List[DataField]:
+        if self._projection:
+            return [f for f in self.table.fields if f.name in self._projection]
+        return list(self.table.fields)
diff --git a/paimon-python/pypaimon/table/format/format_table.py 
b/paimon-python/pypaimon/table/format/format_table.py
new file mode 100644
index 0000000000..564bd08625
--- /dev/null
+++ b/paimon-python/pypaimon/table/format/format_table.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from enum import Enum
+from typing import Dict, List, Optional
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.identifier import Identifier
+from pypaimon.schema.table_schema import TableSchema
+from pypaimon.table.table import Table
+
+
+class Format(str, Enum):
+    ORC = "orc"
+    PARQUET = "parquet"
+    CSV = "csv"
+    TEXT = "text"
+    JSON = "json"
+
+    @classmethod
+    def parse(cls, file_format: str) -> "Format":
+        s = (file_format or "parquet").strip().upper()
+        try:
+            return cls[s]
+        except KeyError:
+            raise ValueError(
+                f"Format table unsupported file format: {file_format}. "
+                f"Supported: {[f.name for f in cls]}"
+            )
+
+
+class FormatTable(Table):
+    def __init__(
+        self,
+        file_io: FileIO,
+        identifier: Identifier,
+        table_schema: TableSchema,
+        location: str,
+        format: Format,
+        options: Optional[Dict[str, str]] = None,
+        comment: Optional[str] = None,
+    ):
+        self.file_io = file_io
+        self.identifier = identifier
+        self._table_schema = table_schema
+        self._location = location.rstrip("/")
+        self._format = format
+        self._options = options or dict(table_schema.options)
+        self.comment = comment
+        self.fields = table_schema.fields
+        self.field_names = [f.name for f in self.fields]
+        self.partition_keys = table_schema.partition_keys or []
+        self.primary_keys: List[str] = []  # format table has no primary key
+
+    def name(self) -> str:
+        return self.identifier.get_table_name()
+
+    def full_name(self) -> str:
+        return self.identifier.get_full_name()
+
+    @property
+    def table_schema(self) -> TableSchema:
+        return self._table_schema
+
+    @table_schema.setter
+    def table_schema(self, value: TableSchema):
+        self._table_schema = value
+
+    def location(self) -> str:
+        return self._location
+
+    def format(self) -> Format:
+        return self._format
+
+    def options(self) -> Dict[str, str]:
+        return self._options
+
+    def new_read_builder(self):
+        from pypaimon.table.format.format_read_builder import FormatReadBuilder
+        return FormatReadBuilder(self)
+
+    def new_batch_write_builder(self):
+        from pypaimon.table.format.format_batch_write_builder import 
FormatBatchWriteBuilder
+        return FormatBatchWriteBuilder(self)
+
+    def new_stream_write_builder(self):
+        raise NotImplementedError("Format table does not support stream 
write.")
diff --git a/paimon-python/pypaimon/table/format/format_table_commit.py 
b/paimon-python/pypaimon/table/format/format_table_commit.py
new file mode 100644
index 0000000000..d869744590
--- /dev/null
+++ b/paimon-python/pypaimon/table/format/format_table_commit.py
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List
+
+import pyarrow.fs as pafs
+
+from pypaimon.table.format.format_table import FormatTable
+from pypaimon.table.format.format_table_scan import _is_data_file_name
+from pypaimon.table.format.format_commit_message import 
FormatTableCommitMessage
+
+
+def _delete_data_files_in_path(file_io, path: str) -> None:
+    try:
+        infos = file_io.list_status(path)
+    except Exception:
+        return
+    for info in infos:
+        if info.type == pafs.FileType.Directory:
+            _delete_data_files_in_path(file_io, info.path)
+        elif info.type == pafs.FileType.File:
+            name = info.path.split("/")[-1] if "/" in info.path else info.path
+            if _is_data_file_name(name):
+                try:
+                    file_io.delete(info.path, False)
+                except Exception:
+                    pass
+
+
+class FormatTableCommit:
+    """Commit for format table. Overwrite is applied in FormatTableWrite at 
write time."""
+
+    def __init__(self, table: FormatTable):
+        self.table = table
+        self._committed = False
+
+    def commit(self, commit_messages: List[FormatTableCommitMessage]) -> None:
+        if self._committed:
+            raise RuntimeError("FormatTableCommit supports only one commit.")
+        self._committed = True
+        return
+
+    def abort(self, commit_messages: List[FormatTableCommitMessage]) -> None:
+        for msg in commit_messages:
+            for path in msg.written_paths:
+                try:
+                    if self.table.file_io.exists(path):
+                        self.table.file_io.delete(path, False)
+                except Exception:
+                    pass
+
+    def close(self) -> None:
+        pass
diff --git a/paimon-python/pypaimon/table/format/format_table_read.py 
b/paimon-python/pypaimon/table/format/format_table_read.py
new file mode 100644
index 0000000000..11ce3faf5b
--- /dev/null
+++ b/paimon-python/pypaimon/table/format/format_table_read.py
@@ -0,0 +1,277 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Any, Dict, Iterator, List, Optional
+
+import pandas
+import pyarrow
+
+from pypaimon.schema.data_types import PyarrowFieldParser
+from pypaimon.table.format.format_data_split import FormatDataSplit
+from pypaimon.table.format.format_table import FormatTable, Format
+
+
+def _text_format_schema_column(table: FormatTable) -> Optional[str]:
+    """TEXT format: single data column name from schema (non-partition)."""
+    if table.format() != Format.TEXT or not table.fields:
+        return None
+    data_names = [
+        f.name for f in table.fields if f.name not in table.partition_keys
+    ]
+    return (
+        data_names[0]
+        if data_names
+        else (table.field_names[0] if table.field_names else None)
+    )
+
+
+def _read_file_to_arrow(
+    file_io: Any,
+    split: FormatDataSplit,
+    fmt: Format,
+    partition_spec: Optional[Dict[str, str]],
+    read_fields: Optional[List[str]],
+    partition_key_types: Optional[Dict[str, pyarrow.DataType]] = None,
+    text_column_name: Optional[str] = None,
+    text_line_delimiter: str = "\n",
+) -> pyarrow.Table:
+    path = split.data_path()
+    csv_read_options = None
+    if fmt == Format.CSV and hasattr(pyarrow, "csv"):
+        csv_read_options = pyarrow.csv.ReadOptions(block_size=1 << 20)
+    try:
+        with file_io.new_input_stream(path) as stream:
+            chunks = []
+            while True:
+                chunk = stream.read()
+                if not chunk:
+                    break
+                chunks.append(
+                    chunk if isinstance(chunk, bytes) else bytes(chunk)
+                )
+            data = b"".join(chunks)
+    except Exception as e:
+        raise RuntimeError(f"Failed to read {path}") from e
+
+    if not data or len(data) == 0:
+        return pyarrow.table({})
+
+    if fmt == Format.PARQUET:
+        import io
+        data = (
+            bytes(data) if not isinstance(data, bytes) else data
+        )
+        if len(data) < 4 or data[:4] != b"PAR1":
+            return pyarrow.table({})
+        try:
+            tbl = pyarrow.parquet.read_table(io.BytesIO(data))
+        except pyarrow.ArrowInvalid:
+            return pyarrow.table({})
+    elif fmt == Format.CSV:
+        if hasattr(pyarrow, "csv"):
+            tbl = pyarrow.csv.read_csv(
+                pyarrow.BufferReader(data),
+                read_options=csv_read_options,
+            )
+        else:
+            import io
+            df = pandas.read_csv(io.BytesIO(data))
+            tbl = pyarrow.Table.from_pandas(df)
+    elif fmt == Format.JSON:
+        import json
+        text = data.decode("utf-8") if isinstance(data, bytes) else data
+        records = []
+        for line in text.strip().split("\n"):
+            line = line.strip()
+            if line:
+                records.append(json.loads(line))
+        if not records:
+            return pyarrow.table({})
+        tbl = pyarrow.Table.from_pylist(records)
+    elif fmt == Format.ORC:
+        import io
+        data = bytes(data) if not isinstance(data, bytes) else data
+        if hasattr(pyarrow, "orc"):
+            try:
+                tbl = pyarrow.orc.read_table(io.BytesIO(data))
+            except Exception:
+                return pyarrow.table({})
+        else:
+            raise ValueError(
+                "Format table read for ORC requires PyArrow with ORC support "
+                "(pyarrow.orc)"
+            )
+    elif fmt == Format.TEXT:
+        text = data.decode("utf-8") if isinstance(data, bytes) else data
+        lines = (
+            text.rstrip(text_line_delimiter).split(text_line_delimiter)
+            if text
+            else []
+        )
+        if not lines:
+            return pyarrow.table({})
+        part_keys = set(partition_spec.keys()) if partition_spec else set()
+        col_name = text_column_name if text_column_name else "value"
+        if read_fields:
+            for f in read_fields:
+                if f not in part_keys:
+                    col_name = f
+                    break
+        tbl = pyarrow.table({col_name: lines})
+    else:
+        raise ValueError(f"Format {fmt} read not implemented in Python")
+
+    if partition_spec:
+        for k, v in partition_spec.items():
+            if k in tbl.column_names:
+                continue
+            pa_type = (
+                partition_key_types.get(k, pyarrow.string())
+                if partition_key_types
+                else pyarrow.string()
+            )
+            arr = pyarrow.array([v] * tbl.num_rows, type=pyarrow.string())
+            if pa_type != pyarrow.string():
+                arr = arr.cast(pa_type)
+            tbl = tbl.append_column(k, arr)
+
+    if read_fields and tbl.num_columns > 0:
+        existing = [c for c in read_fields if c in tbl.column_names]
+        if existing:
+            tbl = tbl.select(existing)
+    return tbl
+
+
+def _partition_key_types(
+    table: FormatTable,
+) -> Optional[Dict[str, pyarrow.DataType]]:
+    """Build partition column name -> PyArrow type from table schema."""
+    if not table.partition_keys:
+        return None
+    result = {}
+    for f in table.fields:
+        if f.name in table.partition_keys:
+            pa_field = PyarrowFieldParser.from_paimon_field(f)
+            result[f.name] = pa_field.type
+    return result if result else None
+
+
+class FormatTableRead:
+
+    def __init__(
+        self,
+        table: FormatTable,
+        projection: Optional[List[str]] = None,
+        limit: Optional[int] = None,
+    ):
+        self.table = table
+        self.projection = projection
+        self.limit = limit
+
+    def to_arrow(
+        self,
+        splits: List[FormatDataSplit],
+    ) -> pyarrow.Table:
+        read_fields = self.projection
+        fmt = self.table.format()
+        partition_key_types = _partition_key_types(self.table)
+        text_col = (
+            _text_format_schema_column(self.table)
+            if fmt == Format.TEXT
+            else None
+        )
+        text_delim = (
+            self.table.options().get("text.line-delimiter", "\n")
+            if fmt == Format.TEXT
+            else "\n"
+        )
+        tables = []
+        nrows = 0
+        for split in splits:
+            t = _read_file_to_arrow(
+                self.table.file_io,
+                split,
+                fmt,
+                split.partition,
+                read_fields,
+                partition_key_types,
+                text_column_name=text_col,
+                text_line_delimiter=text_delim,
+            )
+            if t.num_rows > 0:
+                tables.append(t)
+                nrows += t.num_rows
+                if self.limit is not None and nrows >= self.limit:
+                    if nrows > self.limit:
+                        excess = nrows - self.limit
+                        last = tables[-1]
+                        tables[-1] = last.slice(0, last.num_rows - excess)
+                    break
+        if not tables:
+            fields = self.table.fields
+            if read_fields:
+                fields = [
+                    f for f in self.table.fields if f.name in read_fields
+                ]
+            schema = PyarrowFieldParser.from_paimon_schema(fields)
+            return pyarrow.Table.from_pydict(
+                {n: [] for n in schema.names},
+                schema=schema,
+            )
+        out = pyarrow.concat_tables(tables)
+        if self.limit is not None and out.num_rows > self.limit:
+            out = out.slice(0, self.limit)
+        return out
+
+    def to_pandas(self, splits: List[FormatDataSplit]) -> pandas.DataFrame:
+        return self.to_arrow(splits).to_pandas()
+
+    def to_iterator(
+        self,
+        splits: List[FormatDataSplit],
+    ) -> Iterator[Any]:
+        partition_key_types = _partition_key_types(self.table)
+        fmt = self.table.format()
+        text_col = (
+            _text_format_schema_column(self.table)
+            if fmt == Format.TEXT
+            else None
+        )
+        text_delim = (
+            self.table.options().get("text.line-delimiter", "\n")
+            if fmt == Format.TEXT
+            else "\n"
+        )
+        n_yielded = 0
+        for split in splits:
+            if self.limit is not None and n_yielded >= self.limit:
+                break
+            t = _read_file_to_arrow(
+                self.table.file_io,
+                split,
+                fmt,
+                split.partition,
+                self.projection,
+                partition_key_types,
+                text_column_name=text_col,
+                text_line_delimiter=text_delim,
+            )
+            for batch in t.to_batches():
+                for i in range(batch.num_rows):
+                    if self.limit is not None and n_yielded >= self.limit:
+                        return
+                    yield batch.slice(i, 1)
+                    n_yielded += 1
diff --git a/paimon-python/pypaimon/table/format/format_table_scan.py 
b/paimon-python/pypaimon/table/format/format_table_scan.py
new file mode 100644
index 0000000000..9ce2d9ea26
--- /dev/null
+++ b/paimon-python/pypaimon/table/format/format_table_scan.py
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Dict, List, Optional
+
+import pyarrow.fs as pafs
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.read.plan import Plan
+from pypaimon.table.format.format_data_split import FormatDataSplit
+from pypaimon.table.format.format_table import FormatTable
+
+
+def _is_data_file_name(name: str) -> bool:
+    if name is None:
+        return False
+    return not name.startswith(".") and not name.startswith("_")
+
+
+def _is_reserved_dir_name(name: str) -> bool:
+    if not name:
+        return True
+    if name.startswith(".") or name.startswith("_"):
+        return True
+    if name.lower() in ("schema", "_schema"):
+        return True
+    return False
+
+
+def _list_data_files_recursive(
+    file_io: FileIO,
+    path: str,
+    partition_keys: List[str],
+    partition_only_value: bool,
+    rel_path_parts: Optional[List[str]] = None,
+) -> List[FormatDataSplit]:
+    splits: List[FormatDataSplit] = []
+    rel_path_parts = rel_path_parts or []
+    try:
+        infos = file_io.list_status(path)
+    except Exception:
+        return splits
+    if not infos:
+        return splits
+    path_rstrip = path.rstrip("/")
+    for info in infos:
+        name = info.path.split("/")[-1] if "/" in info.path else info.path
+        full_path = f"{path_rstrip}/{name}" if path_rstrip else name
+        if info.path.startswith("/") or info.path.startswith("file:"):
+            full_path = info.path
+        if info.type == pafs.FileType.Directory:
+            if _is_reserved_dir_name(name):
+                continue
+            part_value = name
+            if not partition_only_value and "=" in name:
+                part_value = name.split("=", 1)[1]
+            child_parts = rel_path_parts + [part_value]
+            if len(child_parts) <= len(partition_keys):
+                sub_splits = _list_data_files_recursive(
+                    file_io,
+                    full_path,
+                    partition_keys,
+                    partition_only_value,
+                    child_parts,
+                )
+                splits.extend(sub_splits)
+        elif info.type == pafs.FileType.File and _is_data_file_name(name):
+            size = getattr(info, "size", None) or 0
+            part_spec: Optional[Dict[str, str]] = None
+            if partition_keys and len(rel_path_parts) >= len(partition_keys):
+                part_spec = dict(
+                    zip(
+                        partition_keys,
+                        rel_path_parts[: len(partition_keys)],
+                    )
+                )
+            splits.append(
+                FormatDataSplit(
+                    file_path=full_path,
+                    file_size=size,
+                    partition=part_spec,
+                )
+            )
+    return splits
+
+
+class FormatTableScan:
+
+    def __init__(
+        self,
+        table: FormatTable,
+        partition_filter: Optional[Dict[str, str]] = None,
+    ):
+        self.table = table
+        self.partition_filter = partition_filter  # optional equality filter
+
+    def plan(self) -> Plan:
+        partition_only_value = self.table.options().get(
+            "format-table.partition-path-only-value", "false"
+        ).lower() == "true"
+        splits = _list_data_files_recursive(
+            self.table.file_io,
+            self.table.location(),
+            self.table.partition_keys,
+            partition_only_value,
+        )
+        if self.partition_filter:
+            filtered = []
+            for s in splits:
+                match = s.partition and all(
+                    str(s.partition.get(k)) == str(v)
+                    for k, v in self.partition_filter.items()
+                )
+                if match:
+                    filtered.append(s)
+            splits = filtered
+        return Plan(_splits=splits)
diff --git a/paimon-python/pypaimon/table/format/format_table_write.py 
b/paimon-python/pypaimon/table/format/format_table_write.py
new file mode 100644
index 0000000000..eb45b718d5
--- /dev/null
+++ b/paimon-python/pypaimon/table/format/format_table_write.py
@@ -0,0 +1,253 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import io
+import uuid
+from collections import defaultdict
+from typing import Dict, List, Optional
+
+import pyarrow
+
+from pypaimon.schema.data_types import PyarrowFieldParser
+from pypaimon.table.format.format_commit_message import (
+    FormatTableCommitMessage,
+)
+from pypaimon.table.format.format_table import (
+    Format,
+    FormatTable,
+)
+
+
+def _partition_path(
+    partition_spec: dict, partition_keys: List[str], only_value: bool
+) -> str:
+    parts = []
+    for k in partition_keys:
+        v = partition_spec.get(k)
+        if v is None:
+            break
+        parts.append(str(v) if only_value else f"{k}={v}")
+    return "/".join(parts)
+
+
+def _validate_partition_columns(
+    partition_keys: List[str],
+    data: pyarrow.RecordBatch,
+) -> None:
+    """Raise if partition key missing from data (wrong column indexing)."""
+    names = set(data.schema.names) if data.schema else set()
+    missing = [k for k in partition_keys if k not in names]
+    if missing:
+        raise ValueError(
+            f"Partition column(s) missing from input data: {missing}. "
+            f"Data columns: {list(names)}. "
+            "Ensure partition keys exist in the Arrow schema."
+        )
+
+
+def _partition_from_row(
+    row: pyarrow.RecordBatch,
+    partition_keys: List[str],
+    row_index: int,
+) -> tuple:
+    out = []
+    for k in partition_keys:
+        col = row.column(row.schema.get_field_index(k))
+        val = col[row_index]
+        is_none = val is None or (
+            hasattr(val, "as_py") and val.as_py() is None
+        )
+        if is_none:
+            out.append(None)
+        else:
+            out.append(val.as_py() if hasattr(val, "as_py") else val)
+    return tuple(out)
+
+
+class FormatTableWrite:
+    """Batch write for format table: Arrow/Pandas to partition dirs."""
+
+    def __init__(
+        self,
+        table: FormatTable,
+        overwrite: bool = False,
+        static_partitions: Optional[Dict[str, str]] = None,
+    ):
+        self.table = table
+        self._overwrite = overwrite
+        self._static_partitions = (
+            static_partitions if static_partitions is not None else {}
+        )
+        self._written_paths: List[str] = []
+        self._overwritten_dirs: set = set()
+        opt = table.options().get(
+            "format-table.partition-path-only-value", "false"
+        )
+        self._partition_only_value = opt.lower() == "true"
+        self._file_format = table.format()
+        self._data_file_prefix = "data-"
+        self._suffix = {
+            "parquet": ".parquet",
+            "csv": ".csv",
+            "json": ".json",
+            "orc": ".orc",
+            "text": ".txt",
+        }.get(self._file_format.value, ".parquet")
+
+    def write_arrow(self, data: pyarrow.Table) -> None:
+        for batch in data.to_batches():
+            self.write_arrow_batch(batch)
+
+    def write_arrow_batch(self, data: pyarrow.RecordBatch) -> None:
+        partition_keys = self.table.partition_keys
+        if not partition_keys:
+            part_spec = {}
+            self._write_single_batch(data, part_spec)
+            return
+        _validate_partition_columns(partition_keys, data)
+        # Group rows by partition
+        parts_to_indices = defaultdict(list)
+        for i in range(data.num_rows):
+            part = _partition_from_row(data, partition_keys, i)
+            parts_to_indices[part].append(i)
+        for part_tuple, indices in parts_to_indices.items():
+            part_spec = dict(zip(partition_keys, part_tuple))
+            sub = data.take(pyarrow.array(indices))
+            self._write_single_batch(sub, part_spec)
+
+    def write_pandas(self, df) -> None:
+        pa_schema = PyarrowFieldParser.from_paimon_schema(self.table.fields)
+        batch = pyarrow.RecordBatch.from_pandas(df, schema=pa_schema)
+        self.write_arrow_batch(batch)
+
+    def _write_single_batch(
+        self,
+        data: pyarrow.RecordBatch,
+        partition_spec: dict,
+    ) -> None:
+        if data.num_rows == 0:
+            return
+        location = self.table.location()
+        partition_only_value = self._partition_only_value
+        part_path = _partition_path(
+            partition_spec,
+            self.table.partition_keys,
+            partition_only_value,
+        )
+        if part_path:
+            dir_path = f"{location}/{part_path}"
+        else:
+            dir_path = location
+        # When overwrite: clear partition dir only once per write session
+        overwrite_this = (
+            self._overwrite
+            and dir_path not in self._overwritten_dirs
+            and self.table.file_io.exists(dir_path)
+        )
+        if overwrite_this:
+            should_delete = (
+                not self._static_partitions
+                or all(
+                    str(partition_spec.get(k)) == str(v)
+                    for k, v in self._static_partitions.items()
+                )
+            )
+            if should_delete:
+                from pypaimon.table.format.format_table_commit import (
+                    _delete_data_files_in_path,
+                )
+                _delete_data_files_in_path(self.table.file_io, dir_path)
+                self._overwritten_dirs.add(dir_path)
+        self.table.file_io.check_or_mkdirs(dir_path)
+        file_name = f"{self._data_file_prefix}{uuid.uuid4().hex}{self._suffix}"
+        path = f"{dir_path}/{file_name}"
+
+        fmt = self._file_format
+        tbl = pyarrow.Table.from_batches([data])
+        if fmt == Format.PARQUET:
+            buf = io.BytesIO()
+            pyarrow.parquet.write_table(tbl, buf, compression="zstd")
+            raw = buf.getvalue()
+        elif fmt == Format.CSV:
+            if hasattr(pyarrow, "csv"):
+                buf = io.BytesIO()
+                pyarrow.csv.write_csv(tbl, buf)
+                raw = buf.getvalue()
+            else:
+                buf = io.StringIO()
+                tbl.to_pandas().to_csv(buf, index=False)
+                raw = buf.getvalue().encode("utf-8")
+        elif fmt == Format.JSON:
+            import json
+            lines = []
+            for i in range(tbl.num_rows):
+                row = {
+                    tbl.column_names[j]: tbl.column(j)[i].as_py()
+                    for j in range(tbl.num_columns)
+                }
+                lines.append(json.dumps(row) + "\n")
+            raw = "".join(lines).encode("utf-8")
+        elif fmt == Format.ORC:
+            if hasattr(pyarrow, "orc"):
+                buf = io.BytesIO()
+                pyarrow.orc.write_table(tbl, buf)
+                raw = buf.getvalue()
+            else:
+                raise ValueError(
+                    "Format table write for ORC requires PyArrow with ORC "
+                    "support (pyarrow.orc)"
+                )
+        elif fmt == Format.TEXT:
+            partition_keys = self.table.partition_keys
+            if partition_keys:
+                data_cols = [
+                    c for c in tbl.column_names if c not in partition_keys
+                ]
+                tbl = tbl.select(data_cols)
+            pa_f0 = tbl.schema.field(0).type
+            if tbl.num_columns != 1 or not pyarrow.types.is_string(pa_f0):
+                raise ValueError(
+                    "TEXT format only supports a single string column, "
+                    f"got {tbl.num_columns} columns"
+                )
+            line_delimiter = self.table.options().get(
+                "text.line-delimiter", "\n"
+            )
+            lines = []
+            col = tbl.column(0)
+            for i in range(tbl.num_rows):
+                val = col[i]
+                py_val = val.as_py() if hasattr(val, "as_py") else val
+                line = "" if py_val is None else str(py_val)
+                lines.append(line + line_delimiter)
+            raw = "".join(lines).encode("utf-8")
+        else:
+            raise ValueError(f"Format table write not implemented for {fmt}")
+
+        with self.table.file_io.new_output_stream(path) as out:
+            out.write(raw)
+
+        self._written_paths.append(path)
+
+    def prepare_commit(self) -> List[FormatTableCommitMessage]:
+        return [
+            FormatTableCommitMessage(
+                written_paths=list(self._written_paths)
+            )
+        ]
+
+    def close(self) -> None:
+        pass
diff --git a/paimon-python/pypaimon/tests/rest/rest_format_table_test.py 
b/paimon-python/pypaimon/tests/rest/rest_format_table_test.py
new file mode 100644
index 0000000000..d081ae6cd0
--- /dev/null
+++ b/paimon-python/pypaimon/tests/rest/rest_format_table_test.py
@@ -0,0 +1,613 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import unittest
+
+import pandas as pd
+import pyarrow as pa
+from parameterized import parameterized
+
+from pypaimon import Schema
+from pypaimon.catalog.catalog_exception import TableNotExistException
+from pypaimon.table.format import FormatTable
+from pypaimon.tests.rest.rest_base_test import RESTBaseTest
+
+
+def _format_table_read_write_formats():
+    formats = [("parquet",), ("csv",), ("json",)]
+    if hasattr(pa, "orc"):
+        formats.append(("orc",))
+    return formats
+
+
+class RESTFormatTableTest(RESTBaseTest):
+
+    @parameterized.expand(_format_table_read_write_formats())
+    def test_format_table_read_write(self, file_format):
+        pa_schema = pa.schema([
+            ("a", pa.int32()),
+            ("b", pa.int32()),
+            ("c", pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={"type": "format-table", "file.format": file_format},
+        )
+        table_name = f"default.format_table_rw_{file_format}"
+        try:
+            self.rest_catalog.drop_table(table_name, True)
+        except Exception:
+            pass
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+        self.assertIsInstance(table, FormatTable)
+        self.assertEqual(table.format().value, file_format)
+        opts = table.options()
+        self.assertIsInstance(opts, dict)
+        self.assertEqual(opts.get("file.format"), file_format)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        df = pd.DataFrame({
+            "a": [10, 10],
+            "b": [1, 2],
+            "c": [1, 2],
+        })
+        table_write.write_pandas(df)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        table_read = read_builder.new_read()
+        actual = 
table_read.to_pandas(splits).sort_values(by="b").reset_index(drop=True)
+        expected = pa.Table.from_pydict(
+            {"a": [10, 10], "b": [1, 2], "c": [1, 2]},
+            schema=pa_schema,
+        ).to_pandas()
+        for col in expected.columns:
+            if col in actual.columns and actual[col].dtype != 
expected[col].dtype:
+                actual[col] = actual[col].astype(expected[col].dtype)
+        pd.testing.assert_frame_equal(actual, expected)
+
+    def test_format_table_text_read_write(self):
+        pa_schema = pa.schema([("value", pa.string())])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={"type": "format-table", "file.format": "text"},
+        )
+        table_name = "default.format_table_rw_text"
+        try:
+            self.rest_catalog.drop_table(table_name, True)
+        except Exception:
+            pass
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+        self.assertIsInstance(table, FormatTable)
+        self.assertEqual(table.format().value, "text")
+        opts = table.options()
+        self.assertIsInstance(opts, dict)
+        self.assertEqual(opts.get("file.format"), "text")
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        df = pd.DataFrame({"value": ["hello", "world"]})
+        table_write.write_pandas(df)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        table_read = read_builder.new_read()
+        actual = 
table_read.to_pandas(splits).sort_values(by="value").reset_index(drop=True)
+        expected = pd.DataFrame({"value": ["hello", "world"]})
+        pd.testing.assert_frame_equal(actual, expected)
+
+    def test_format_table_text_read_write_with_nulls(self):
+        pa_schema = pa.schema([("value", pa.string())])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={"type": "format-table", "file.format": "text"},
+        )
+        table_name = "default.format_table_rw_text_nulls"
+        try:
+            self.rest_catalog.drop_table(table_name, True)
+        except Exception:
+            pass
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        df = pd.DataFrame({"value": ["hello", None, "world"]})
+        table_write.write_pandas(df)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        table_read = read_builder.new_read()
+        actual = table_read.to_pandas(splits)
+        self.assertEqual(actual.shape[0], 3)
+        # Nulls are written as empty string; read back as ""
+        self.assertEqual(set(actual["value"].fillna("").astype(str)), {"", 
"hello", "world"})
+        self.assertIn("", actual["value"].values)
+
+    def test_format_table_text_partitioned_read_write(self):
+        pa_schema = pa.schema([
+            ("value", pa.string()),
+            ("dt", pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            partition_keys=["dt"],
+            options={"type": "format-table", "file.format": "text"},
+        )
+        table_name = "default.format_table_rw_text_partitioned"
+        try:
+            self.rest_catalog.drop_table(table_name, True)
+        except Exception:
+            pass
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+        self.assertIsInstance(table, FormatTable)
+        self.assertEqual(table.format().value, "text")
+
+        write_builder = table.new_batch_write_builder()
+        tw = write_builder.new_write()
+        tc = write_builder.new_commit()
+        tw.write_pandas(pd.DataFrame({"value": ["a", "b"], "dt": [1, 1]}))
+        tw.write_pandas(pd.DataFrame({"value": ["c"], "dt": [2]}))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        actual = 
read_builder.new_read().to_pandas(splits).sort_values(by=["dt", 
"value"]).reset_index(drop=True)
+        self.assertEqual(actual.shape[0], 3)
+        self.assertEqual(actual["value"].tolist(), ["a", "b", "c"])
+        self.assertEqual(actual["dt"].tolist(), [1, 1, 2])
+
+    def test_format_table_read_with_limit_to_iterator(self):
+        pa_schema = pa.schema([
+            ("a", pa.int32()),
+            ("b", pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={"type": "format-table", "file.format": "parquet"},
+        )
+        table_name = "default.format_table_limit_iterator"
+        try:
+            self.rest_catalog.drop_table(table_name, True)
+        except Exception:
+            pass
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+        write_builder = table.new_batch_write_builder()
+        tw = write_builder.new_write()
+        tc = write_builder.new_commit()
+        tw.write_pandas(pd.DataFrame({"a": [1, 2, 3, 4], "b": [10, 20, 30, 
40]}))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        splits = table.new_read_builder().new_scan().plan().splits()
+        limit = 2
+        read_builder = table.new_read_builder().with_limit(limit)
+        table_read = read_builder.new_read()
+
+        df = table_read.to_pandas(splits)
+        self.assertEqual(len(df), limit, "to_pandas must respect 
with_limit(2)")
+
+        batches = list(table_read.to_iterator(splits))
+        self.assertEqual(len(batches), limit, "to_iterator must respect 
with_limit(2)")
+
+    @parameterized.expand(_format_table_read_write_formats())
+    def test_format_table_partitioned_overwrite(self, file_format):
+        pa_schema = pa.schema([
+            ("a", pa.int32()),
+            ("b", pa.int32()),
+            ("c", pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            partition_keys=["c"],
+            options={"type": "format-table", "file.format": file_format},
+        )
+        table_name = 
f"default.format_table_partitioned_overwrite_{file_format}"
+        self.rest_catalog.drop_table(table_name, True)
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+
+        write_builder = table.new_batch_write_builder()
+        tw = write_builder.new_write()
+        tc = write_builder.new_commit()
+        tw.write_pandas(pd.DataFrame({"a": [10, 10], "b": [10, 20], "c": [1, 
1]}))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        tw = table.new_batch_write_builder().overwrite({"c": 1}).new_write()
+        tc = table.new_batch_write_builder().overwrite({"c": 1}).new_commit()
+        tw.write_pandas(pd.DataFrame({"a": [12, 12], "b": [100, 200], "c": [1, 
1]}))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        actual = read_builder.new_read().to_pandas(splits).sort_values(by="b")
+        self.assertEqual(len(actual), 2)
+        self.assertEqual(actual["b"].tolist(), [100, 200])
+
+    def test_format_table_overwrite_only_specified_partition(self):
+        pa_schema = pa.schema([
+            ("a", pa.int32()),
+            ("b", pa.int32()),
+            ("c", pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            partition_keys=["c"],
+            options={"type": "format-table", "file.format": "parquet"},
+        )
+        table_name = "default.format_table_overwrite_one_partition"
+        try:
+            self.rest_catalog.drop_table(table_name, True)
+        except Exception:
+            pass
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+
+        wb = table.new_batch_write_builder()
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_pandas(pd.DataFrame({"a": [10, 10], "b": [10, 20], "c": [1, 
1]}))
+        tw.write_pandas(pd.DataFrame({"a": [30, 30], "b": [30, 40], "c": [2, 
2]}))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        tw = table.new_batch_write_builder().overwrite({"c": 1}).new_write()
+        tc = table.new_batch_write_builder().overwrite({"c": 1}).new_commit()
+        tw.write_pandas(pd.DataFrame({"a": [12, 12], "b": [100, 200], "c": [1, 
1]}))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        actual = table.new_read_builder().new_read().to_pandas(
+            table.new_read_builder().new_scan().plan().splits()
+        ).sort_values(by=["c", "b"])
+        self.assertEqual(len(actual), 4)
+        self.assertEqual(actual["b"].tolist(), [100, 200, 30, 40])
+        self.assertEqual(actual["c"].tolist(), [1, 1, 2, 2])
+        c1 = actual[actual["c"] == 1]["b"].tolist()
+        c2 = actual[actual["c"] == 2]["b"].tolist()
+        self.assertEqual(c1, [100, 200], "partition c=1 must be overwritten")
+        self.assertEqual(c2, [30, 40], "partition c=2 must be unchanged")
+
+    def test_format_table_overwrite_multiple_batches_same_partition(self):
+        pa_schema = pa.schema([
+            ("a", pa.int32()),
+            ("b", pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={"type": "format-table", "file.format": "parquet"},
+        )
+        table_name = "default.format_table_overwrite_multi_batch"
+        try:
+            self.rest_catalog.drop_table(table_name, True)
+        except Exception:
+            pass
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+
+        wb = table.new_batch_write_builder()
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_pandas(pd.DataFrame({"a": [1, 2], "b": [10, 20]}))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        tw = wb.overwrite().new_write()
+        tc = wb.overwrite().new_commit()
+        tw.write_pandas(pd.DataFrame({"a": [3, 4], "b": [30, 40]}))
+        tw.write_pandas(pd.DataFrame({"a": [5, 6], "b": [50, 60]}))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        actual = table.new_read_builder().new_read().to_pandas(
+            table.new_read_builder().new_scan().plan().splits()
+        ).sort_values(by="b")
+        self.assertEqual(len(actual), 4, "overwrite + 2 write_pandas same 
partition must keep all 4 rows")
+        self.assertEqual(actual["b"].tolist(), [30, 40, 50, 60])
+
+    @parameterized.expand(_format_table_read_write_formats())
+    def test_format_table_partitioned_read_write(self, file_format):
+        pa_schema = pa.schema([
+            ("a", pa.int32()),
+            ("b", pa.int32()),
+            ("dt", pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            partition_keys=["dt"],
+            options={"type": "format-table", "file.format": file_format},
+        )
+        table_name = f"default.format_table_partitioned_rw_{file_format}"
+        self.rest_catalog.drop_table(table_name, True)
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+        self.assertIsInstance(table, FormatTable)
+
+        wb = table.new_batch_write_builder()
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_pandas(pd.DataFrame({"a": [1, 2], "b": [10, 20], "dt": [10, 
10]}))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_pandas(pd.DataFrame({"a": [3, 4], "b": [30, 40], "dt": [11, 
11]}))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        rb = table.new_read_builder()
+        splits_all = rb.new_scan().plan().splits()
+        actual_all = rb.new_read().to_pandas(splits_all).sort_values(by="b")
+        self.assertEqual(len(actual_all), 4)
+        self.assertEqual(sorted(actual_all["b"].tolist()), [10, 20, 30, 40])
+
+        rb_dt10 = table.new_read_builder().with_partition_filter({"dt": "10"})
+        splits_dt10 = rb_dt10.new_scan().plan().splits()
+        actual_dt10 = 
rb_dt10.new_read().to_pandas(splits_dt10).sort_values(by="b")
+        self.assertEqual(len(actual_dt10), 2)
+        self.assertEqual(actual_dt10["b"].tolist(), [10, 20])
+        # Partition column must match schema type (int32), not string
+        self.assertEqual(actual_dt10["dt"].tolist(), [10, 10])
+        self.assertEqual(actual_all["dt"].tolist(), [10, 10, 11, 11])
+
+    def test_format_table_partition_column_returns_schema_type(self):
+        """Partition columns must be returned with schema type (e.g. int32), 
not always string."""
+        pa_schema = pa.schema([
+            ("a", pa.int32()),
+            ("b", pa.int32()),
+            ("dt", pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            partition_keys=["dt"],
+            options={"type": "format-table", "file.format": "parquet"},
+        )
+        table_name = "default.format_table_partition_schema_type"
+        try:
+            self.rest_catalog.drop_table(table_name, True)
+        except Exception:
+            pass
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+        wb = table.new_batch_write_builder()
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_pandas(pd.DataFrame({"a": [1, 2], "b": [10, 20], "dt": [1, 
1]}))
+        tw.write_pandas(pd.DataFrame({"a": [3, 4], "b": [30, 40], "dt": [2, 
2]}))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        rb = table.new_read_builder().with_partition_filter({"dt": "1"})
+        splits = rb.new_scan().plan().splits()
+        actual = rb.new_read().to_pandas(splits).sort_values(by="b")
+        self.assertEqual(len(actual), 2)
+        self.assertEqual(actual["b"].tolist(), [10, 20])
+        # Must be int list, not string list; fails if partition column is 
hardcoded as string
+        self.assertEqual(actual["dt"].tolist(), [1, 1])
+        self.assertTrue(
+            actual["dt"].dtype in (pd.Int32Dtype(), "int32", "int64"),
+            "dt must be int type per schema, not string",
+        )
+
+    def test_format_table_with_filter_extracts_partition_like_java(self):
+        pa_schema = pa.schema([
+            ("a", pa.int32()),
+            ("b", pa.int32()),
+            ("dt", pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            partition_keys=["dt"],
+            options={"type": "format-table", "file.format": "parquet"},
+        )
+        table_name = "default.format_table_with_filter_assert"
+        try:
+            self.rest_catalog.drop_table(table_name, True)
+        except Exception:
+            pass
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+        wb = table.new_batch_write_builder()
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_pandas(pd.DataFrame({"a": [1, 2], "b": [10, 20], "dt": [10, 
10]}))
+        tw.write_pandas(pd.DataFrame({"a": [3, 4], "b": [30, 40], "dt": [11, 
11]}))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        predicate_eq_dt10 = 
table.new_read_builder().new_predicate_builder().equal("dt", 10)
+        splits_by_partition_filter = (
+            table.new_read_builder().with_partition_filter({"dt": 
"10"}).new_scan().plan().splits()
+        )
+        splits_by_with_filter = (
+            
table.new_read_builder().with_filter(predicate_eq_dt10).new_scan().plan().splits()
+        )
+        self.assertEqual(
+            len(splits_by_with_filter), len(splits_by_partition_filter),
+            "with_filter(partition equality) must behave like 
with_partition_filter (Java-aligned)",
+        )
+        actual_from_filter = (
+            
table.new_read_builder().with_filter(predicate_eq_dt10).new_read().to_pandas(splits_by_with_filter)
+        )
+        self.assertEqual(len(actual_from_filter), 2)
+        self.assertEqual(actual_from_filter["b"].tolist(), [10, 20])
+
+        splits_partition_then_filter = (
+            table.new_read_builder()
+            .with_partition_filter({"dt": "10"})
+            .with_filter(predicate_eq_dt10)
+            .new_scan()
+            .plan()
+            .splits()
+        )
+        self.assertEqual(
+            len(splits_partition_then_filter), len(splits_by_partition_filter),
+            "with_filter must not overwrite a previously set partition filter",
+        )
+        actual = (
+            table.new_read_builder()
+            .with_partition_filter({"dt": "10"})
+            .with_filter(predicate_eq_dt10)
+            .new_read()
+            .to_pandas(splits_partition_then_filter)
+        )
+        self.assertEqual(len(actual), 2)
+        self.assertEqual(actual["b"].tolist(), [10, 20])
+
+        predicate_non_partition = 
table.new_read_builder().new_predicate_builder().equal("a", 1)
+        splits_no_filter = table.new_read_builder().new_scan().plan().splits()
+        splits_with_non_partition_predicate = (
+            
table.new_read_builder().with_filter(predicate_non_partition).new_scan().plan().splits()
+        )
+        self.assertEqual(
+            len(splits_with_non_partition_predicate), len(splits_no_filter),
+            "with_filter(non-partition predicate) must not change scan when no 
partition spec extracted",
+        )
+
+    @parameterized.expand(_format_table_read_write_formats())
+    def test_format_table_full_overwrite(self, file_format):
+        pa_schema = pa.schema([
+            ("a", pa.int32()),
+            ("b", pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={"type": "format-table", "file.format": file_format},
+        )
+        table_name = f"default.format_table_full_overwrite_{file_format}"
+        self.rest_catalog.drop_table(table_name, True)
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+
+        wb = table.new_batch_write_builder()
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_pandas(pd.DataFrame({"a": [1, 2], "b": [10, 20]}))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        tw = wb.overwrite().new_write()
+        tc = wb.overwrite().new_commit()
+        tw.write_pandas(pd.DataFrame({"a": [3], "b": [30]}))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        actual = rb.new_read().to_pandas(splits)
+        self.assertEqual(len(actual), 1)
+        self.assertEqual(actual["b"].tolist(), [30])
+
+    @parameterized.expand(_format_table_read_write_formats())
+    def test_format_table_split_read(self, file_format):
+        pa_schema = pa.schema([
+            ("id", pa.int32()),
+            ("name", pa.string()),
+            ("score", pa.float64()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                "type": "format-table",
+                "file.format": file_format,
+                "source.split.target-size": "54",
+            },
+        )
+        table_name = f"default.format_table_split_read_{file_format}"
+        self.rest_catalog.drop_table(table_name, True)
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+
+        size = 50
+        for i in range(0, size, 10):
+            batch = pd.DataFrame({
+                "id": list(range(i, min(i + 10, size))),
+                "name": [f"User{j}" for j in range(i, min(i + 10, size))],
+                "score": [85.5 + (j % 15) for j in range(i, min(i + 10, 
size))],
+            })
+            wb = table.new_batch_write_builder()
+            tw = wb.new_write()
+            tc = wb.new_commit()
+            tw.write_pandas(batch)
+            tc.commit(tw.prepare_commit())
+            tw.close()
+            tc.close()
+
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        actual = rb.new_read().to_pandas(splits).sort_values(by="id")
+        self.assertEqual(len(actual), size)
+        self.assertEqual(actual["id"].tolist(), list(range(size)))
+
+    @parameterized.expand(_format_table_read_write_formats())
+    def test_format_table_catalog(self, file_format):
+        pa_schema = pa.schema([
+            ("str", pa.string()),
+            ("int", pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={"type": "format-table", "file.format": file_format},
+        )
+        table_name = f"default.format_table_catalog_{file_format}"
+        self.rest_catalog.drop_table(table_name, True)
+        self.rest_catalog.create_table(table_name, schema, False)
+        self.assertIn(f"format_table_catalog_{file_format}", 
self.rest_catalog.list_tables("default"))
+        table = self.rest_catalog.get_table(table_name)
+        self.assertIsInstance(table, FormatTable)
+
+        self.rest_catalog.drop_table(table_name, False)
+        with self.assertRaises(TableNotExistException):
+            self.rest_catalog.get_table(table_name)
+
+
+if __name__ == "__main__":
+    unittest.main()

Reply via email to