This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b413290ef4 [python] Add ROW file format read and write support (#8014)
b413290ef4 is described below

commit b413290ef40f2c41667b652112fd6cc283f44767
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu May 28 21:19:41 2026 +0800

    [python] Add ROW file format read and write support (#8014)
    
    Implement the ROW binary file format in the Python SDK, matching the
    Java implementation for cross-language interoperability. The format uses
    block-level ZSTD compression with delta-varint block indexing and a
    32-byte footer.
---
 .../test/java/org/apache/paimon/JavaPyE2ETest.java |  62 +++
 .../apache/paimon/format/row/RowFormatWriter.java  |   1 -
 paimon-python/dev/run_mixed_tests.sh               |  58 ++-
 .../pypaimon/catalog/rest/rest_token_file_io.py    |   3 +
 paimon-python/pypaimon/common/file_io.py           |   3 +
 .../pypaimon/common/options/core_options.py        |   1 +
 .../pypaimon/filesystem/caching_file_io.py         |   3 +
 paimon-python/pypaimon/filesystem/local_file_io.py |  21 +
 .../pypaimon/filesystem/pyarrow_file_io.py         |  15 +
 .../pypaimon/read/reader/format_row_reader.py      | 469 ++++++++++++++++++
 paimon-python/pypaimon/read/split_read.py          |  28 +-
 .../pypaimon/tests/e2e/java_py_read_write_test.py  |  49 ++
 .../tests/test_format_row_reader_writer.py         | 534 +++++++++++++++++++++
 .../pypaimon/tests/test_format_row_table.py        | 503 +++++++++++++++++++
 .../pypaimon/write/writer/data_blob_writer.py      |   2 +
 .../pypaimon/write/writer/data_vector_writer.py    |   2 +
 paimon-python/pypaimon/write/writer/data_writer.py |   2 +
 .../pypaimon/write/writer/format_row_writer.py     | 408 ++++++++++++++++
 18 files changed, 2160 insertions(+), 4 deletions(-)

diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java 
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index 86cea365c7..379331a1c2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -997,6 +997,68 @@ public class JavaPyE2ETest {
         return GenericRow.ofKind(rowKind, values[0], values[1], values[2]);
     }
 
+    /** Java writes a ROW-format append-only table for Python to read 
(Java→Python E2E). */
+    @Test
+    @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+    public void testJavaWriteRowAppendTable() throws Exception {
+        Identifier identifier = identifier("mixed_test_append_tablej_row");
+        catalog.dropTable(identifier, true);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column("value", DataTypes.DOUBLE())
+                        .option("file.format", "row")
+                        .option("bucket", "-1")
+                        .build();
+
+        catalog.createTable(identifier, schema, false);
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(1, BinaryString.fromString("Apple"), 
1.5));
+            write.write(GenericRow.of(2, BinaryString.fromString("Banana"), 
0.8));
+            write.write(GenericRow.of(3, BinaryString.fromString("Carrot"), 
0.6));
+            write.write(GenericRow.of(4, BinaryString.fromString("Broccoli"), 
1.2));
+            write.write(GenericRow.of(5, BinaryString.fromString("Chicken"), 
5.0));
+            write.write(GenericRow.of(6, BinaryString.fromString("Beef"), 
8.0));
+            commit.commit(write.prepareCommit());
+        }
+
+        List<Split> splits = new 
ArrayList<>(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newRead();
+        List<String> res =
+                getResult(
+                        read,
+                        splits,
+                        row -> DataFormatTestUtil.toStringNoRowKind(row, 
table.rowType()));
+        assertThat(res).hasSize(6);
+        LOG.info("testJavaWriteRowAppendTable: wrote and read back {} 
ROW-format rows", res.size());
+    }
+
+    /** Java reads a ROW-format append-only table written by Python 
(Python→Java E2E). */
+    @Test
+    @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+    public void testReadRowAppendTable() throws Exception {
+        Identifier identifier = identifier("mixed_test_append_tablep_row");
+        Table table = catalog.getTable(identifier);
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        List<Split> splits =
+                new 
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
+        TableRead read = fileStoreTable.newRead();
+        List<String> res =
+                getResult(
+                        read,
+                        splits,
+                        row -> DataFormatTestUtil.toStringNoRowKind(row, 
table.rowType()));
+        assertThat(res).hasSize(6);
+        LOG.info(
+                "testReadRowAppendTable: Java read {} ROW-format rows written 
by Python",
+                res.size());
+    }
+
     /** Java writes a VARIANT-column table for Python to read (Java→Python 
E2E). */
     @Test
     @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/row/RowFormatWriter.java 
b/paimon-format/src/main/java/org/apache/paimon/format/row/RowFormatWriter.java
index ff9ee1112b..7fcd750744 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/row/RowFormatWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/row/RowFormatWriter.java
@@ -89,7 +89,6 @@ public class RowFormatWriter implements FormatWriter {
         footer.writeTo(out);
 
         out.flush();
-        out.close();
     }
 
     private void flushBlock() throws IOException {
diff --git a/paimon-python/dev/run_mixed_tests.sh 
b/paimon-python/dev/run_mixed_tests.sh
index 7cc32eedee..6e7a7e00b5 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -612,6 +612,48 @@ run_py_variant_write_java_read_test() {
     fi
 }
 
+# Function to run ROW format test (Java write, Python read, Python write, Java 
read)
+run_row_format_test() {
+    echo -e "${YELLOW}=== Running ROW Format Test (Java Write → Python Read, 
Python Write → Java Read) ===${NC}"
+
+    cd "$PROJECT_ROOT"
+
+    echo "Running Maven test for JavaPyE2ETest.testJavaWriteRowAppendTable..."
+    if ! mvn test 
-Dtest=org.apache.paimon.JavaPyE2ETest#testJavaWriteRowAppendTable -pl 
paimon-core -q -Drun.e2e.tests=true; then
+        echo -e "${RED}✗ Java ROW write test failed${NC}"
+        return 1
+    fi
+    echo -e "${GREEN}✓ Java ROW write test completed successfully${NC}"
+
+    cd "$PAIMON_PYTHON_DIR"
+    echo "Running Python test for 
JavaPyReadWriteTest.test_read_row_append_table..."
+    if ! python -m pytest 
java_py_read_write_test.py::JavaPyReadWriteTest::test_read_row_append_table -v; 
then
+        echo -e "${RED}✗ Python ROW read test failed${NC}"
+        return 1
+    fi
+    echo -e "${GREEN}✓ Python ROW read test completed successfully${NC}"
+
+    echo ""
+
+    echo "Running Python test for 
JavaPyReadWriteTest.test_py_write_row_append_table..."
+    if ! python -m pytest 
java_py_read_write_test.py::JavaPyReadWriteTest::test_py_write_row_append_table 
-v; then
+        echo -e "${RED}✗ Python ROW write test failed${NC}"
+        return 1
+    fi
+    echo -e "${GREEN}✓ Python ROW write test completed successfully${NC}"
+
+    echo ""
+
+    cd "$PROJECT_ROOT"
+    echo "Running Maven test for JavaPyE2ETest.testReadRowAppendTable..."
+    if ! mvn test 
-Dtest=org.apache.paimon.JavaPyE2ETest#testReadRowAppendTable -pl paimon-core 
-q -Drun.e2e.tests=true; then
+        echo -e "${RED}✗ Java ROW read test failed${NC}"
+        return 1
+    fi
+    echo -e "${GREEN}✓ Java ROW read test completed successfully${NC}"
+    return 0
+}
+
 # Main execution
 main() {
     local java_write_result=0
@@ -635,6 +677,7 @@ main() {
     local multi_vector_dedicated_py_write_result=0
     local java_variant_write_py_read_result=0
     local py_variant_write_java_read_result=0
+    local row_format_result=0
 
     # Detect Python version
     PYTHON_VERSION=$(python -c "import sys; 
print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null || 
echo "unknown")
@@ -815,6 +858,13 @@ main() {
 
     echo ""
 
+    # Run ROW format test (Java write + Python read + Python write + Java read)
+    if ! run_row_format_test; then
+        row_format_result=1
+    fi
+
+    echo ""
+
     echo -e "${YELLOW}=== Test Results Summary ===${NC}"
 
     if [[ $java_write_result -eq 0 ]]; then
@@ -943,12 +993,18 @@ main() {
         echo -e "${RED}✗ VARIANT Type Test (Python Write, Java Read): 
FAILED${NC}"
     fi
 
+    if [[ $row_format_result -eq 0 ]]; then
+        echo -e "${GREEN}✓ ROW Format Test (Java Write ↔ Python Read/Write): 
PASSED${NC}"
+    else
+        echo -e "${RED}✗ ROW Format Test (Java Write ↔ Python Read/Write): 
FAILED${NC}"
+    fi
+
     echo ""
 
     # Clean up warehouse directory after all tests
     cleanup_warehouse
 
-    if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && 
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && 
$btree_index_result -eq 0 && $compressed_text_result -eq 0 && 
$tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && 
$lumina_vector_btree_result -eq 0 && $compact_conflict_result -eq 0 && 
$blob_alter_compact_result -eq 0 && $data_evolution_result -eq 0 && 
$data_evolution_py_write_result -eq 0 && $java_variant_write_py_read_result -eq 
[...]
+    if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && 
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && 
$btree_index_result -eq 0 && $compressed_text_result -eq 0 && 
$tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && 
$lumina_vector_btree_result -eq 0 && $compact_conflict_result -eq 0 && 
$blob_alter_compact_result -eq 0 && $data_evolution_result -eq 0 && 
$data_evolution_py_write_result -eq 0 && $java_variant_write_py_read_result -eq 
[...]
         echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability 
verified.${NC}"
         return 0
     else
diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py 
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index 777dd6fef1..42dabb268c 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -191,6 +191,9 @@ class RESTTokenFileIO(FileIO):
     def write_blob(self, path: str, data, **kwargs):
         return self.file_io().write_blob(path, data, **kwargs)
 
+    def write_row(self, path: str, data, fields=None, zstd_level: int = 1, 
**kwargs):
+        return self.file_io().write_row(path, data, fields, zstd_level, 
**kwargs)
+
     @property
     def uri_reader_factory(self):
         if self._uri_reader_factory_cache is None:
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index 6f9758965b..172ee2a66e 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -257,6 +257,9 @@ class FileIO(ABC):
     def write_vortex(self, path: str, data, **kwargs):
         raise NotImplementedError("write_vortex must be implemented by FileIO 
subclasses")
 
+    def write_row(self, path: str, data, fields=None, zstd_level: int = 1, 
**kwargs):
+        raise NotImplementedError("write_row must be implemented by FileIO 
subclasses")
+
     def close(self):
         pass
 
diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index b2f0e01916..1cc33f6df0 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -64,6 +64,7 @@ class CoreOptions:
     FILE_FORMAT_BLOB: str = "blob"
     FILE_FORMAT_LANCE: str = "lance"
     FILE_FORMAT_VORTEX: str = "vortex"
+    FILE_FORMAT_ROW: str = "row"
 
     # Basic options
     AUTO_CREATE: ConfigOption[bool] = (
diff --git a/paimon-python/pypaimon/filesystem/caching_file_io.py 
b/paimon-python/pypaimon/filesystem/caching_file_io.py
index 778614c56c..10605be5f9 100644
--- a/paimon-python/pypaimon/filesystem/caching_file_io.py
+++ b/paimon-python/pypaimon/filesystem/caching_file_io.py
@@ -414,6 +414,9 @@ class CachingFileIO(FileIO):
     def write_vortex(self, *args, **kwargs):
         return self._delegate.write_vortex(*args, **kwargs)
 
+    def write_row(self, *args, **kwargs):
+        return self._delegate.write_row(*args, **kwargs)
+
     def __getattr__(self, name):
         return getattr(self._delegate, name)
 
diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py 
b/paimon-python/pypaimon/filesystem/local_file_io.py
index 120f2c6b3a..8385790885 100644
--- a/paimon-python/pypaimon/filesystem/local_file_io.py
+++ b/paimon-python/pypaimon/filesystem/local_file_io.py
@@ -412,6 +412,27 @@ class LocalFileIO(FileIO):
             self.delete_quietly(path)
             raise RuntimeError(f"Failed to write Vortex file {path}: {e}") 
from e
 
+    def write_row(self, path: str, data: pyarrow.Table, fields=None, 
zstd_level: int = 1, **kwargs):
+        try:
+            from pypaimon.write.writer.format_row_writer import FormatRowWriter
+            from pypaimon.schema.data_types import PyarrowFieldParser
+
+            if fields is None:
+                fields = PyarrowFieldParser.to_paimon_schema(data.schema)
+
+            file_path = self._to_file(path)
+            parent = file_path.parent
+            if parent and not parent.exists():
+                parent.mkdir(parents=True, exist_ok=True)
+
+            with open(file_path, 'wb') as output_stream:
+                writer = FormatRowWriter(output_stream, fields, 
zstd_level=zstd_level)
+                writer.write_table(data)
+                writer.close()
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write row file {path}: {e}") from e
+
     def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
         try:
             if data.num_columns != 1:
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py 
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index e7315f3eff..74d43579a2 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -673,6 +673,21 @@ class PyArrowFileIO(FileIO):
             self.delete_quietly(path)
             raise RuntimeError(f"Failed to write Vortex file {path}: {e}") 
from e
 
+    def write_row(self, path: str, data: pyarrow.Table, fields=None, 
zstd_level: int = 1, **kwargs):
+        try:
+            from pypaimon.write.writer.format_row_writer import FormatRowWriter
+
+            if fields is None:
+                fields = PyarrowFieldParser.to_paimon_schema(data.schema)
+
+            with self.new_output_stream(path) as output_stream:
+                writer = FormatRowWriter(output_stream, fields, 
zstd_level=zstd_level)
+                writer.write_table(data)
+                writer.close()
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write row file {path}: {e}") from e
+
     def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
         try:
             if data.num_columns != 1:
diff --git a/paimon-python/pypaimon/read/reader/format_row_reader.py 
b/paimon-python/pypaimon/read/reader/format_row_reader.py
new file mode 100644
index 0000000000..34a9c663a9
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/format_row_reader.py
@@ -0,0 +1,469 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import struct
+from decimal import Decimal
+from typing import Any, List, Optional
+
+import pyarrow as pa
+import pyarrow.dataset as ds
+from pyarrow import RecordBatch
+
+from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
+from pypaimon.common.file_io import FileIO
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.schema.data_types import (
+    ArrayType, DataField, MapType, MultisetType, PyarrowFieldParser, RowType, 
VectorType, AtomicType
+)
+
+FOOTER_SIZE = 32
+MAGIC = 0x524F5753  # "ROWS"
+VERSION = 1
+
+
+class FormatRowReader(RecordBatchReader):
+
+    def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
+                 full_fields: List[DataField], push_down_predicate: Any,
+                 batch_size: int = 1024, row_indices: Optional[List[int]] = 
None):
+        self._file_io = file_io
+        self._file_path = file_path
+        self._push_down_predicate = push_down_predicate
+        self._batch_size = batch_size
+
+        self._file_size = file_io.get_file_size(file_path)
+
+        full_fields_map = {field.name: field for field in full_fields}
+        self._projected_fields = [full_fields_map[name] for name in 
read_fields]
+        self._all_fields = full_fields
+        self._schema = 
PyarrowFieldParser.from_paimon_schema(self._projected_fields)
+
+        self._block_compressed_sizes: List[int] = []
+        self._block_uncompressed_sizes: List[int] = []
+        self._block_row_starts: List[int] = []
+        self._block_offsets: List[int] = []
+        self._total_row_count = 0
+        self._block_count = 0
+        self._current_block_idx = 0
+
+        self._row_indices = row_indices
+        self._row_indices_pos = 0
+
+        self._read_metadata()
+
+        if self._row_indices is not None:
+            self._blocks_to_read = self._compute_blocks_for_indices()
+            self._blocks_to_read_pos = 0
+        else:
+            self._blocks_to_read = None
+
+    def _compute_blocks_for_indices(self) -> List[tuple]:
+        """Group row_indices by block. Returns list of (block_idx, 
[local_row_offsets])."""
+        import bisect
+        result = []
+        row_starts = self._block_row_starts
+        for idx in self._row_indices:
+            block_idx = bisect.bisect_right(row_starts, idx) - 1
+            local_row = idx - row_starts[block_idx]
+            if result and result[-1][0] == block_idx:
+                result[-1][1].append(local_row)
+            else:
+                result.append((block_idx, [local_row]))
+        return result
+
+    def read_arrow_batch(self) -> Optional[RecordBatch]:
+        if self._row_indices is not None:
+            return self._read_batch_indexed()
+        return self._read_batch_sequential()
+
+    def _read_batch_sequential(self) -> Optional[RecordBatch]:
+        if self._current_block_idx >= self._block_count:
+            return None
+
+        block_data = self._read_and_decompress_block(self._current_block_idx)
+        self._current_block_idx += 1
+
+        columns = self._decode_block(block_data)
+
+        if not columns or all(len(col) == 0 for col in columns):
+            return None
+
+        pydict = {field.name: columns[i] for i, field in 
enumerate(self._projected_fields)}
+        table = pa.Table.from_pydict(pydict, self._schema)
+
+        if self._push_down_predicate is not None:
+            dataset = ds.InMemoryDataset(table)
+            scanner = dataset.scanner(filter=self._push_down_predicate)
+            table = scanner.to_table().combine_chunks()
+
+        if table.num_rows == 0:
+            return self._read_batch_sequential()
+
+        return table.to_batches()[0]
+
+    def _read_batch_indexed(self) -> Optional[RecordBatch]:
+        if self._blocks_to_read_pos >= len(self._blocks_to_read):
+            return None
+
+        block_idx, local_rows = self._blocks_to_read[self._blocks_to_read_pos]
+        self._blocks_to_read_pos += 1
+
+        block_data = self._read_and_decompress_block(block_idx)
+        columns = self._decode_block(block_data, row_filter=local_rows)
+
+        if not columns or all(len(col) == 0 for col in columns):
+            return self._read_batch_indexed()
+
+        pydict = {field.name: columns[i] for i, field in 
enumerate(self._projected_fields)}
+        table = pa.Table.from_pydict(pydict, self._schema)
+
+        if self._push_down_predicate is not None:
+            dataset = ds.InMemoryDataset(table)
+            scanner = dataset.scanner(filter=self._push_down_predicate)
+            table = scanner.to_table().combine_chunks()
+
+        if table.num_rows == 0:
+            return self._read_batch_indexed()
+
+        return table.to_batches()[0]
+
+    def close(self):
+        pass
+
+    def _read_metadata(self):
+        with self._file_io.new_input_stream(self._file_path) as f:
+            f.seek(self._file_size - FOOTER_SIZE)
+            footer_bytes = f.read(FOOTER_SIZE)
+
+        if len(footer_bytes) != FOOTER_SIZE:
+            raise IOError("Invalid row file: cannot read footer")
+
+        magic = struct.unpack_from('<I', footer_bytes, 28)[0]
+        if magic != MAGIC:
+            raise IOError(f"Invalid row file magic: expected 0x{MAGIC:08X}, 
got 0x{magic:08X}")
+
+        version = footer_bytes[24]
+        if version != VERSION:
+            raise IOError(f"Unsupported row file version: {version}")
+
+        self._total_row_count = struct.unpack_from('<q', footer_bytes, 0)[0]
+        self._block_count = struct.unpack_from('<i', footer_bytes, 8)[0]
+        index_offset = struct.unpack_from('<q', footer_bytes, 12)[0]
+        index_length = struct.unpack_from('<i', footer_bytes, 20)[0]
+
+        with self._file_io.new_input_stream(self._file_path) as f:
+            f.seek(index_offset)
+            index_bytes = f.read(index_length)
+
+        if len(index_bytes) != index_length:
+            raise IOError("Invalid row file: cannot read block index")
+
+        self._parse_block_index(index_bytes)
+
+    def _parse_block_index(self, index_data: bytes):
+        pos = 0
+
+        len1, consumed = _decode_var_int(index_data, pos)
+        pos += consumed
+        self._block_compressed_sizes = 
DeltaVarintCompressor.decompress(index_data[pos:pos + len1])
+        pos += len1
+
+        len2, consumed = _decode_var_int(index_data, pos)
+        pos += consumed
+        self._block_uncompressed_sizes = 
DeltaVarintCompressor.decompress(index_data[pos:pos + len2])
+        pos += len2
+
+        len3, consumed = _decode_var_int(index_data, pos)
+        pos += consumed
+        self._block_row_starts = 
DeltaVarintCompressor.decompress(index_data[pos:pos + len3])
+
+        offset = 0
+        self._block_offsets = []
+        for size in self._block_compressed_sizes:
+            self._block_offsets.append(offset)
+            offset += size
+
+    def _read_and_decompress_block(self, block_idx: int) -> bytes:
+        import zstandard as zstd
+
+        offset = self._block_offsets[block_idx]
+        compressed_size = self._block_compressed_sizes[block_idx]
+
+        with self._file_io.new_input_stream(self._file_path) as f:
+            f.seek(offset)
+            compressed_data = f.read(compressed_size)
+
+        decompressor = zstd.ZstdDecompressor()
+        uncompressed_size = self._block_uncompressed_sizes[block_idx]
+        return decompressor.decompress(compressed_data, 
max_output_size=uncompressed_size)
+
+    def _decode_block(self, block_data: bytes,
+                      row_filter: Optional[List[int]] = None) -> List[List]:
+        data_len = len(block_data)
+        row_count = struct.unpack_from('<i', block_data, data_len - 4)[0]
+        offset_array_start = data_len - 4 - row_count * 4
+
+        projected_indices = []
+        for pf in self._projected_fields:
+            for i, af in enumerate(self._all_fields):
+                if af.name == pf.name:
+                    projected_indices.append(i)
+                    break
+
+        columns = [[] for _ in self._projected_fields]
+        arity = len(self._all_fields)
+        header_size = (arity + 7) // 8
+
+        rows_to_read = row_filter if row_filter is not None else 
range(row_count)
+        proj_set = set(projected_indices)
+        proj_col_map = {field_idx: col_idx for col_idx, field_idx in 
enumerate(projected_indices)}
+
+        for row_idx in rows_to_read:
+            row_offset = struct.unpack_from('<i', block_data, 
offset_array_start + row_idx * 4)[0]
+            decoder = _RowDecoder(block_data, row_offset)
+
+            null_bitmap = block_data[decoder.pos:decoder.pos + header_size]
+            decoder.pos += header_size
+
+            for field_idx in range(arity):
+                is_null = (null_bitmap[field_idx // 8] & (1 << (field_idx % 
8))) != 0
+                if is_null:
+                    if field_idx in proj_set:
+                        columns[proj_col_map[field_idx]].append(None)
+                else:
+                    value = _read_field(decoder, 
self._all_fields[field_idx].type)
+                    if field_idx in proj_set:
+                        columns[proj_col_map[field_idx]].append(value)
+
+        return columns
+
+
+class _RowDecoder:
+
+    __slots__ = ('data', 'pos')
+
+    def __init__(self, data: bytes, pos: int = 0):
+        self.data = data
+        self.pos = pos
+
+    def read_boolean(self) -> bool:
+        v = self.data[self.pos] != 0
+        self.pos += 1
+        return v
+
+    def read_byte(self) -> int:
+        v = struct.unpack_from('<b', self.data, self.pos)[0]
+        self.pos += 1
+        return v
+
+    def read_short(self) -> int:
+        v = struct.unpack_from('<h', self.data, self.pos)[0]
+        self.pos += 2
+        return v
+
+    def read_int(self) -> int:
+        v = struct.unpack_from('<i', self.data, self.pos)[0]
+        self.pos += 4
+        return v
+
+    def read_long(self) -> int:
+        v = struct.unpack_from('<q', self.data, self.pos)[0]
+        self.pos += 8
+        return v
+
+    def read_float(self) -> float:
+        v = struct.unpack_from('<f', self.data, self.pos)[0]
+        self.pos += 4
+        return v
+
+    def read_double(self) -> float:
+        v = struct.unpack_from('<d', self.data, self.pos)[0]
+        self.pos += 8
+        return v
+
+    def read_var_int(self) -> int:
+        result = 0
+        shift = 0
+        while True:
+            b = self.data[self.pos]
+            self.pos += 1
+            result |= (b & 0x7F) << shift
+            if (b & 0x80) == 0:
+                return result
+            shift += 7
+
+    def read_string(self) -> str:
+        length = self.read_var_int()
+        s = self.data[self.pos:self.pos + length].decode('utf-8')
+        self.pos += length
+        return s
+
+    def read_bytes(self) -> bytes:
+        length = self.read_var_int()
+        b = self.data[self.pos:self.pos + length]
+        self.pos += length
+        return bytes(b)
+
+
+def _decode_var_int(data: bytes, offset: int) -> tuple:
+    result = 0
+    shift = 0
+    pos = offset
+    while True:
+        b = data[pos]
+        pos += 1
+        result |= (b & 0x7F) << shift
+        if (b & 0x80) == 0:
+            return result, pos - offset
+        shift += 7
+
+
+def _read_field(decoder: _RowDecoder, data_type) -> Any:
+    if isinstance(data_type, AtomicType):
+        type_name = data_type.type.upper()
+        if type_name == 'BOOLEAN':
+            return decoder.read_boolean()
+        elif type_name == 'TINYINT':
+            return decoder.read_byte()
+        elif type_name == 'SMALLINT':
+            return decoder.read_short()
+        elif type_name in ('INT', 'INTEGER', 'DATE', 'TIME'):
+            return decoder.read_int()
+        elif type_name.startswith('TIME') and not 
type_name.startswith('TIMESTAMP'):
+            return decoder.read_int()
+        elif type_name == 'BIGINT':
+            return decoder.read_long()
+        elif type_name == 'FLOAT':
+            return decoder.read_float()
+        elif type_name == 'DOUBLE':
+            return decoder.read_double()
+        elif type_name == 'STRING' or type_name.startswith('CHAR') or 
type_name.startswith('VARCHAR'):
+            return decoder.read_string()
+        elif type_name == 'BYTES' or type_name.startswith('BINARY') or 
type_name.startswith('VARBINARY'):
+            return decoder.read_bytes()
+        elif type_name == 'BLOB':
+            return decoder.read_bytes()
+        elif type_name.startswith('DECIMAL'):
+            precision, scale = _parse_decimal_params(type_name)
+            if precision <= 18:
+                unscaled = decoder.read_long()
+                return Decimal(unscaled) / Decimal(10 ** scale)
+            else:
+                raw = decoder.read_bytes()
+                unscaled = int.from_bytes(raw, byteorder='big', signed=True)
+                return Decimal(unscaled) / Decimal(10 ** scale)
+        elif type_name.startswith('TIMESTAMP'):
+            precision = _parse_timestamp_precision(type_name)
+            millis = decoder.read_long()
+            if precision <= 3:
+                return millis
+            else:
+                nano_of_milli = decoder.read_var_int()
+                micros = millis * 1000 + nano_of_milli // 1000
+                return micros
+        elif type_name == 'VARIANT':
+            value_bytes = decoder.read_bytes()
+            metadata_bytes = decoder.read_bytes()
+            return {'value': value_bytes, 'metadata': metadata_bytes}
+        else:
+            raise ValueError(f"Unsupported atomic type: {type_name}")
+
+    elif isinstance(data_type, ArrayType):
+        return _read_array(decoder, data_type.element)
+
+    elif isinstance(data_type, VectorType):
+        return _read_vector(decoder, data_type.element)
+
+    elif isinstance(data_type, MapType):
+        keys = _read_array_elements(decoder, data_type.key)
+        values = _read_array_elements(decoder, data_type.value)
+        return list(zip(keys, values))
+
+    elif isinstance(data_type, MultisetType):
+        keys = _read_array_elements(decoder, data_type.element)
+        counts = _read_array_elements(decoder, AtomicType("INT"))
+        return list(zip(keys, counts))
+
+    elif isinstance(data_type, RowType):
+        return _read_nested_row(decoder, data_type)
+
+    else:
+        raise ValueError(f"Unsupported data type: {data_type}")
+
+
+def _read_array(decoder: _RowDecoder, element_type) -> list:
+    return _read_array_elements(decoder, element_type)
+
+
+def _read_array_elements(decoder: _RowDecoder, element_type) -> list:
+    size = decoder.read_var_int()
+    null_bitmap_bytes = (size + 7) // 8
+    null_bitmap = decoder.data[decoder.pos:decoder.pos + null_bitmap_bytes]
+    decoder.pos += null_bitmap_bytes
+
+    elements = []
+    for i in range(size):
+        is_null = (null_bitmap[i // 8] & (1 << (i % 8))) != 0
+        if is_null:
+            elements.append(None)
+        else:
+            elements.append(_read_field(decoder, element_type))
+    return elements
+
+
+def _read_vector(decoder: _RowDecoder, element_type) -> list:
+    size = decoder.read_var_int()
+    elements = []
+    for _ in range(size):
+        elements.append(_read_field(decoder, element_type))
+    return elements
+
+
+def _read_nested_row(decoder: _RowDecoder, row_type: RowType) -> dict:
+    fields = row_type.fields
+    arity = len(fields)
+    header_size = (arity + 7) // 8
+    null_bitmap = decoder.data[decoder.pos:decoder.pos + header_size]
+    decoder.pos += header_size
+
+    result = {}
+    for i, field in enumerate(fields):
+        is_null = (null_bitmap[i // 8] & (1 << (i % 8))) != 0
+        if is_null:
+            result[field.name] = None
+        else:
+            result[field.name] = _read_field(decoder, field.type)
+    return result
+
+
+def _parse_decimal_params(type_name: str) -> tuple:
+    import re
+    match = re.fullmatch(r'DECIMAL\((\d+),\s*(\d+)\)', type_name)
+    if match:
+        return int(match.group(1)), int(match.group(2))
+    match = re.fullmatch(r'DECIMAL\((\d+)\)', type_name)
+    if match:
+        return int(match.group(1)), 0
+    return 10, 0
+
+
+def _parse_timestamp_precision(type_name: str) -> int:
+    import re
+    match = re.search(r'\((\d+)\)', type_name)
+    if match:
+        return int(match.group(1))
+    return 6
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index e432eca6b1..351f8de83e 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -45,6 +45,7 @@ from pypaimon.read.reader.row_range_filter_record_reader 
import RowIdFilterRecor
 from pypaimon.read.reader.format_blob_reader import FormatBlobReader
 from pypaimon.read.reader.format_lance_reader import FormatLanceReader
 from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
+from pypaimon.read.reader.format_row_reader import FormatRowReader
 from pypaimon.read.reader.format_vortex_reader import FormatVortexReader
 from pypaimon.read.reader.iface.record_batch_reader import (RecordBatchReader,
                                                             RowPositionReader, 
EmptyRecordBatchReader)
@@ -195,7 +196,10 @@ class SplitRead(ABC):
             effective_row_ranges = Range.and_(row_ranges, 
[file.row_id_range()])
             if len(effective_row_ranges) == 0:
                 return EmptyRecordBatchReader()
-            if file_format in (CoreOptions.FILE_FORMAT_VORTEX, 
CoreOptions.FILE_FORMAT_LANCE):
+            row_index_formats = (CoreOptions.FILE_FORMAT_VORTEX,
+                                 CoreOptions.FILE_FORMAT_LANCE,
+                                 CoreOptions.FILE_FORMAT_ROW)
+            if file_format in row_index_formats:
                 row_indices = []
                 for r in effective_row_ranges:
                     start = r.from_ - file.first_row_id
@@ -268,10 +272,30 @@ class SplitRead(ABC):
                 ordered_read_fields, read_arrow_predicate, 
batch_size=batch_size,
                 options=self.table.options,
                 nested_name_paths=ordered_nested_paths)
+        elif file_format == CoreOptions.FILE_FORMAT_ROW:
+            if has_nested:
+                raise NotImplementedError(
+                    "Nested-field projection is not supported on ROW files")
+            file_schema = self.table.schema_manager.get_schema(
+                file.schema_id)
+            if file.write_cols:
+                field_map = {f.name: f for f in file_schema.fields}
+                row_full_fields = [field_map[n] for n in file.write_cols
+                                   if n in field_map]
+            elif self.table.is_primary_key_table:
+                row_full_fields = self._create_key_value_fields(
+                    file_schema.fields)
+            else:
+                row_full_fields = file_schema.fields
+            format_reader = FormatRowReader(
+                self.table.file_io, file_path, read_file_fields,
+                row_full_fields,
+                read_arrow_predicate, batch_size=batch_size,
+                row_indices=row_indices)
         elif file_format in ('json', 'csv'):
             raise NotImplementedError(
                 f"Reading '{file_format}' format is not yet supported in 
Python SDK. "
-                f"Supported formats: parquet, orc, avro, lance, blob.")
+                f"Supported formats: parquet, orc, avro, lance, blob, row.")
         else:
             raise ValueError(f"Unexpected file format: {file_format}")
 
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py 
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 475d2e74f7..d01adb26ba 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -277,6 +277,55 @@ class JavaPyReadWriteTest(unittest.TestCase):
         # which explicitly reads KeyValue objects and checks valueKind
         print(f"Format: {file_format}, Python read completed. ValueKind 
verification should be done in Java test.")
 
+    def test_py_write_row_append_table(self):
+        """Python writes a ROW-format append-only table for Java to read."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('value', pa.float64()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={'file.format': 'row', 'bucket': '-1'}
+        )
+
+        table_name = 'default.mixed_test_append_tablep_row'
+        self.catalog.create_table(table_name, schema, False)
+        table = self.catalog.get_table(table_name)
+
+        data = pa.table({
+            'id': pa.array([1, 2, 3, 4, 5, 6], type=pa.int32()),
+            'name': pa.array(['Apple', 'Banana', 'Carrot', 'Broccoli', 
'Chicken', 'Beef']),
+            'value': pa.array([1.5, 0.8, 0.6, 1.2, 5.0, 8.0]),
+        })
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Verify Python can read it back
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+        self.assertEqual(result.num_rows, 6)
+        expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 
'Beef'}
+        self.assertEqual(set(result.column('name').to_pylist()), 
expected_names)
+
+    def test_read_row_append_table(self):
+        """Python reads a ROW-format append-only table written by Java."""
+        table = self.catalog.get_table('default.mixed_test_append_tablej_row')
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+        self.assertEqual(result.num_rows, 6)
+        expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 
'Beef'}
+        self.assertEqual(set(result.column('name').to_pylist()), 
expected_names)
+
     def test_pk_dv_read(self):
         pa_schema = pa.schema([
             pa.field('pt', pa.int32(), nullable=False),
diff --git a/paimon-python/pypaimon/tests/test_format_row_reader_writer.py 
b/paimon-python/pypaimon/tests/test_format_row_reader_writer.py
new file mode 100644
index 0000000000..cff237b5be
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_format_row_reader_writer.py
@@ -0,0 +1,534 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import tempfile
+from decimal import Decimal
+
+import pyarrow as pa
+import pytest
+
+from pypaimon.read.reader.format_row_reader import FormatRowReader
+from pypaimon.schema.data_types import (
+    ArrayType, AtomicType, DataField, MapType, RowType
+)
+from pypaimon.write.writer.format_row_writer import FormatRowWriter
+
+
+class SimpleFileIO:
+    """Minimal FileIO for testing."""
+
+    def get_file_size(self, path):
+        return os.path.getsize(path)
+
+    def new_input_stream(self, path):
+        return open(path, 'rb')
+
+
+def _write_row_file(path, fields, data_table):
+    with open(path, 'wb') as f:
+        writer = FormatRowWriter(f, fields)
+        writer.write_table(data_table)
+        writer.close()
+
+
+def _read_row_file(path, fields, read_field_names=None, row_indices=None):
+    file_io = SimpleFileIO()
+    if read_field_names is None:
+        read_field_names = [f.name for f in fields]
+    reader = FormatRowReader(file_io, path, read_field_names, fields, None,
+                             row_indices=row_indices)
+    batches = []
+    while True:
+        batch = reader.read_arrow_batch()
+        if batch is None:
+            break
+        batches.append(batch)
+    reader.close()
+    if not batches:
+        return pa.table({f.name: [] for f in fields})
+    return pa.Table.from_batches(batches)
+
+
+class TestFormatRowReaderWriter:
+
+    def test_basic_int_string(self):
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "name", AtomicType("STRING")),
+        ]
+        data = pa.table({
+            "id": pa.array([1, 2, 3], type=pa.int32()),
+            "name": pa.array(["alice", "bob", "charlie"], type=pa.string()),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            _write_row_file(path, fields, data)
+            result = _read_row_file(path, fields)
+            assert result.column("id").to_pylist() == [1, 2, 3]
+            assert result.column("name").to_pylist() == ["alice", "bob", 
"charlie"]
+        finally:
+            os.unlink(path)
+
+    def test_all_primitive_types(self):
+        fields = [
+            DataField(0, "bool_col", AtomicType("BOOLEAN")),
+            DataField(1, "tinyint_col", AtomicType("TINYINT")),
+            DataField(2, "smallint_col", AtomicType("SMALLINT")),
+            DataField(3, "int_col", AtomicType("INT")),
+            DataField(4, "bigint_col", AtomicType("BIGINT")),
+            DataField(5, "float_col", AtomicType("FLOAT")),
+            DataField(6, "double_col", AtomicType("DOUBLE")),
+            DataField(7, "string_col", AtomicType("STRING")),
+            DataField(8, "binary_col", AtomicType("BYTES")),
+        ]
+        data = pa.table({
+            "bool_col": pa.array([True, False], type=pa.bool_()),
+            "tinyint_col": pa.array([1, -1], type=pa.int8()),
+            "smallint_col": pa.array([100, -100], type=pa.int16()),
+            "int_col": pa.array([1000, -1000], type=pa.int32()),
+            "bigint_col": pa.array([100000, -100000], type=pa.int64()),
+            "float_col": pa.array([1.5, -2.5], type=pa.float32()),
+            "double_col": pa.array([3.14, -2.71], type=pa.float64()),
+            "string_col": pa.array(["hello", "world"], type=pa.string()),
+            "binary_col": pa.array([b"\x01\x02", b"\x03\x04"], 
type=pa.binary()),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            _write_row_file(path, fields, data)
+            result = _read_row_file(path, fields)
+            assert result.column("bool_col").to_pylist() == [True, False]
+            assert result.column("tinyint_col").to_pylist() == [1, -1]
+            assert result.column("smallint_col").to_pylist() == [100, -100]
+            assert result.column("int_col").to_pylist() == [1000, -1000]
+            assert result.column("bigint_col").to_pylist() == [100000, -100000]
+            assert result.column("float_col").to_pylist()[0] == 
pytest.approx(1.5)
+            assert result.column("float_col").to_pylist()[1] == 
pytest.approx(-2.5)
+            assert result.column("double_col").to_pylist() == 
[pytest.approx(3.14), pytest.approx(-2.71)]
+            assert result.column("string_col").to_pylist() == ["hello", 
"world"]
+            assert result.column("binary_col").to_pylist() == [b"\x01\x02", 
b"\x03\x04"]
+        finally:
+            os.unlink(path)
+
+    def test_nulls(self):
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "name", AtomicType("STRING")),
+        ]
+        data = pa.table({
+            "id": pa.array([1, None, 3], type=pa.int32()),
+            "name": pa.array([None, "bob", None], type=pa.string()),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            _write_row_file(path, fields, data)
+            result = _read_row_file(path, fields)
+            assert result.column("id").to_pylist() == [1, None, 3]
+            assert result.column("name").to_pylist() == [None, "bob", None]
+        finally:
+            os.unlink(path)
+
+    def test_decimal(self):
+        fields = [
+            DataField(0, "d1", AtomicType("DECIMAL(10, 2)")),
+            DataField(1, "d2", AtomicType("DECIMAL(20, 5)")),
+        ]
+        data = pa.table({
+            "d1": pa.array([Decimal("123.45"), Decimal("-67.89")], 
type=pa.decimal128(10, 2)),
+            "d2": pa.array([Decimal("12345.67890"), Decimal("-99999.12345")], 
type=pa.decimal128(20, 5)),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            _write_row_file(path, fields, data)
+            result = _read_row_file(path, fields)
+            assert result.column("d1").to_pylist() == [Decimal("123.45"), 
Decimal("-67.89")]
+            assert result.column("d2").to_pylist() == [Decimal("12345.67890"), 
Decimal("-99999.12345")]
+        finally:
+            os.unlink(path)
+
+    def test_timestamp(self):
+        fields = [
+            DataField(0, "ts_millis", AtomicType("TIMESTAMP(3)")),
+            DataField(1, "ts_micros", AtomicType("TIMESTAMP(6)")),
+        ]
+        data = pa.table({
+            "ts_millis": pa.array([1000, 2000], type=pa.timestamp('ms')),
+            "ts_micros": pa.array([1000000, 2000000], type=pa.timestamp('us')),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            _write_row_file(path, fields, data)
+            result = _read_row_file(path, fields)
+            assert result.num_rows == 2
+        finally:
+            os.unlink(path)
+
+    def test_array_type(self):
+        element_type = AtomicType("INT")
+        fields = [
+            DataField(0, "arr", ArrayType(True, element_type)),
+        ]
+        data = pa.table({
+            "arr": pa.array([[1, 2, 3], [4, 5]], type=pa.list_(pa.int32())),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            _write_row_file(path, fields, data)
+            result = _read_row_file(path, fields)
+            assert result.column("arr").to_pylist() == [[1, 2, 3], [4, 5]]
+        finally:
+            os.unlink(path)
+
+    def test_map_type(self):
+        fields = [
+            DataField(0, "m", MapType(True, AtomicType("STRING"), 
AtomicType("INT"))),
+        ]
+        data = pa.table({
+            "m": pa.array(
+                [[("a", 1), ("b", 2)], [("c", 3)]],
+                type=pa.map_(pa.string(), pa.int32())
+            ),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            _write_row_file(path, fields, data)
+            result = _read_row_file(path, fields)
+            result_maps = result.column("m").to_pylist()
+            assert len(result_maps) == 2
+            assert len(result_maps[0]) == 2
+            assert len(result_maps[1]) == 1
+        finally:
+            os.unlink(path)
+
+    def test_nested_row(self):
+        inner_type = RowType(True, [
+            DataField(0, "x", AtomicType("INT")),
+            DataField(1, "y", AtomicType("STRING")),
+        ])
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "nested", inner_type),
+        ]
+        data = pa.table({
+            "id": pa.array([1, 2], type=pa.int32()),
+            "nested": pa.array(
+                [{"x": 10, "y": "a"}, {"x": 20, "y": "b"}],
+                type=pa.struct([
+                    pa.field("x", pa.int32()),
+                    pa.field("y", pa.string()),
+                ])
+            ),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            _write_row_file(path, fields, data)
+            result = _read_row_file(path, fields)
+            assert result.column("id").to_pylist() == [1, 2]
+            nested = result.column("nested").to_pylist()
+            assert nested[0] == {"x": 10, "y": "a"}
+            assert nested[1] == {"x": 20, "y": "b"}
+        finally:
+            os.unlink(path)
+
+    def test_multi_block(self):
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "data", AtomicType("STRING")),
+        ]
+        num_rows = 5000
+        ids = list(range(num_rows))
+        strings = [f"value_{i}" for i in range(num_rows)]
+        data = pa.table({
+            "id": pa.array(ids, type=pa.int32()),
+            "data": pa.array(strings, type=pa.string()),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            with open(path, 'wb') as f:
+                writer = FormatRowWriter(f, fields, block_size=4096)
+                writer.write_table(data)
+                writer.close()
+
+            result = _read_row_file(path, fields)
+            assert result.num_rows == num_rows
+            assert result.column("id").to_pylist() == ids
+            assert result.column("data").to_pylist() == strings
+        finally:
+            os.unlink(path)
+
+    def test_empty_file(self):
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+        ]
+        data = pa.table({
+            "id": pa.array([], type=pa.int32()),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            _write_row_file(path, fields, data)
+            result = _read_row_file(path, fields)
+            assert result.num_rows == 0
+        finally:
+            os.unlink(path)
+
+    def test_column_projection(self):
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "name", AtomicType("STRING")),
+            DataField(2, "value", AtomicType("DOUBLE")),
+        ]
+        data = pa.table({
+            "id": pa.array([1, 2, 3], type=pa.int32()),
+            "name": pa.array(["a", "b", "c"], type=pa.string()),
+            "value": pa.array([1.1, 2.2, 3.3], type=pa.float64()),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            _write_row_file(path, fields, data)
+            result = _read_row_file(path, fields, read_field_names=["id", 
"value"])
+            assert result.num_columns == 2
+            assert result.column("id").to_pylist() == [1, 2, 3]
+            assert result.column("value").to_pylist() == [pytest.approx(1.1), 
pytest.approx(2.2), pytest.approx(3.3)]
+        finally:
+            os.unlink(path)
+
+    def test_date_and_time(self):
+        fields = [
+            DataField(0, "d", AtomicType("DATE")),
+            DataField(1, "t", AtomicType("TIME")),
+        ]
+        data = pa.table({
+            "d": pa.array([18000, 19000], type=pa.date32()),
+            "t": pa.array([3600000, 7200000], type=pa.time32('ms')),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            _write_row_file(path, fields, data)
+            result = _read_row_file(path, fields)
+            assert result.num_rows == 2
+        finally:
+            os.unlink(path)
+
+    def test_variant_type(self):
+        fields = [
+            DataField(0, "v", AtomicType("VARIANT")),
+        ]
+        data = pa.table({
+            "v": pa.array(
+                [{"value": b"\x01\x02", "metadata": b"\x03\x04"},
+                 {"value": b"\x05", "metadata": b"\x06\x07\x08"}],
+                type=pa.struct([
+                    pa.field("value", pa.binary(), nullable=False),
+                    pa.field("metadata", pa.binary(), nullable=False),
+                ])
+            ),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            _write_row_file(path, fields, data)
+            result = _read_row_file(path, fields)
+            variants = result.column("v").to_pylist()
+            assert variants[0]["value"] == b"\x01\x02"
+            assert variants[0]["metadata"] == b"\x03\x04"
+            assert variants[1]["value"] == b"\x05"
+            assert variants[1]["metadata"] == b"\x06\x07\x08"
+        finally:
+            os.unlink(path)
+
+    def test_row_indices_random_access(self):
+        """Test reading specific rows by index (O(1) row-number lookup)."""
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "name", AtomicType("VARCHAR")),
+        ]
+        data = pa.table({
+            "id": pa.array(list(range(100)), type=pa.int32()),
+            "name": pa.array([f"row_{i}" for i in range(100)]),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            _write_row_file(path, fields, data)
+
+            # Read specific rows: 0, 5, 50, 99
+            result = _read_row_file(path, fields, row_indices=[0, 5, 50, 99])
+            assert result.num_rows == 4
+            assert result.column("id").to_pylist() == [0, 5, 50, 99]
+            assert result.column("name").to_pylist() == [
+                "row_0", "row_5", "row_50", "row_99"
+            ]
+
+            # Read single row
+            result = _read_row_file(path, fields, row_indices=[42])
+            assert result.num_rows == 1
+            assert result.column("id").to_pylist() == [42]
+
+            # Read empty indices
+            result = _read_row_file(path, fields, row_indices=[])
+            assert result.num_rows == 0
+        finally:
+            os.unlink(path)
+
+    def test_row_indices_multi_block(self):
+        """Test row_indices across multiple blocks."""
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "value", AtomicType("VARCHAR")),
+        ]
+        # Write enough data to create multiple blocks (small block size)
+        n_rows = 500
+        data = pa.table({
+            "id": pa.array(list(range(n_rows)), type=pa.int32()),
+            "value": pa.array([f"val_{i}" * 10 for i in range(n_rows)]),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+            path = tmp.name
+
+        try:
+            with open(path, 'wb') as f:
+                writer = FormatRowWriter(f, fields, block_size=1024)
+                writer.write_table(data)
+                writer.close()
+
+            # Read rows from different blocks
+            indices = [0, 100, 200, 300, 499]
+            result = _read_row_file(path, fields, row_indices=indices)
+            assert result.num_rows == 5
+            assert result.column("id").to_pylist() == indices
+        finally:
+            os.unlink(path)
+
+    def test_data_evolution_row_id_read(self):
+        """Test Data Evolution scenario: partial-column write then row-id 
based read.
+
+        Simulates the Data Evolution pattern where:
+        1. First commit writes columns (f0, f1)
+        2. Second commit writes column (f2) with first_row_id=0
+        3. Read merges by row ID to reconstruct full rows
+        """
+        import shutil
+        from pypaimon import CatalogFactory, Schema
+
+        tempdir = tempfile.mkdtemp()
+        try:
+            warehouse = os.path.join(tempdir, 'warehouse')
+            catalog = CatalogFactory.create({'warehouse': warehouse})
+            catalog.create_database('default', True)
+
+            pa_schema = pa.schema([
+                ('f0', pa.int32()),
+                ('f1', pa.string()),
+                ('f2', pa.string()),
+            ])
+
+            schema = Schema.from_pyarrow_schema(
+                pa_schema,
+                options={
+                    'file.format': 'row',
+                    'row-tracking.enabled': 'true',
+                    'data-evolution.enabled': 'true',
+                })
+            catalog.create_table('default.de_row_id_test', schema, False)
+            table = catalog.get_table('default.de_row_id_test')
+
+            # First commit: write (f0, f1)
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write().with_write_type(['f0', 
'f1'])
+            table_commit = write_builder.new_commit()
+
+            data1 = pa.table({
+                'f0': pa.array([1, 2, 3, 4, 5], type=pa.int32()),
+                'f1': pa.array(['a1', 'a2', 'a3', 'a4', 'a5']),
+            })
+            table_write.write_arrow(data1)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+            # Second commit: write (f2) with first_row_id = 0
+            table_write = write_builder.new_write().with_write_type(['f2'])
+            table_commit = write_builder.new_commit()
+
+            data2 = pa.table({
+                'f2': pa.array(['b1', 'b2', 'b3', 'b4', 'b5']),
+            })
+            table_write.write_arrow(data2)
+            cmts = table_write.prepare_commit()
+            cmts[0].new_files[0].first_row_id = 0
+            table_commit.commit(cmts)
+            table_write.close()
+            table_commit.close()
+
+            # Read full table - should merge partial columns by row ID
+            table = catalog.get_table('default.de_row_id_test')
+            read_builder = table.new_read_builder()
+            splits = read_builder.new_scan().plan().splits()
+            result = read_builder.new_read().to_arrow(splits)
+
+            assert result.num_rows == 5
+            result_sorted = result.sort_by('f0')
+            assert result_sorted.column('f0').to_pylist() == [1, 2, 3, 4, 5]
+            assert result_sorted.column('f1').to_pylist() == [
+                'a1', 'a2', 'a3', 'a4', 'a5'
+            ]
+            assert result_sorted.column('f2').to_pylist() == [
+                'b1', 'b2', 'b3', 'b4', 'b5'
+            ]
+        finally:
+            shutil.rmtree(tempdir, ignore_errors=True)
diff --git a/paimon-python/pypaimon/tests/test_format_row_table.py 
b/paimon-python/pypaimon/tests/test_format_row_table.py
new file mode 100644
index 0000000000..c483144547
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_format_row_table.py
@@ -0,0 +1,503 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Integration tests for the ROW file format across all table types."""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class RowFormatAppendOnlyTest(unittest.TestCase):
+    """Test ROW format with append-only (non-primary-key) tables."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+
+        cls.pa_schema = pa.schema([
+            ('user_id', pa.int32()),
+            ('item_id', pa.int64()),
+            ('behavior', pa.string()),
+            ('dt', pa.string()),
+        ])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def _write_and_read(self, table, data):
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        return read_builder.new_read().to_arrow(splits)
+
+    def test_append_only_no_partition(self):
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema, options={'file.format': 'row'})
+        self.catalog.create_table('default.ao_row_no_part', schema, False)
+        table = self.catalog.get_table('default.ao_row_no_part')
+
+        data = pa.Table.from_pydict({
+            'user_id': [1, 2, 3],
+            'item_id': [1001, 1002, 1003],
+            'behavior': ['buy', 'click', 'view'],
+            'dt': ['2024-01-01', '2024-01-01', '2024-01-02'],
+        }, schema=self.pa_schema)
+
+        result = self._write_and_read(table, data)
+        self.assertEqual(result.num_rows, 3)
+        self.assertEqual(sorted(result.column('user_id').to_pylist()), [1, 2, 
3])
+
+    def test_append_only_with_partition(self):
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema, partition_keys=['dt'],
+            options={'file.format': 'row'})
+        self.catalog.create_table('default.ao_row_partitioned', schema, False)
+        table = self.catalog.get_table('default.ao_row_partitioned')
+
+        data = pa.Table.from_pydict({
+            'user_id': [1, 2, 3, 4],
+            'item_id': [1001, 1002, 1003, 1004],
+            'behavior': ['buy', 'click', 'view', 'buy'],
+            'dt': ['p1', 'p1', 'p2', 'p2'],
+        }, schema=self.pa_schema)
+
+        result = self._write_and_read(table, data)
+        self.assertEqual(result.num_rows, 4)
+
+    def test_append_only_multiple_commits(self):
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema, options={'file.format': 'row'})
+        self.catalog.create_table('default.ao_row_multi_commit', schema, False)
+        table = self.catalog.get_table('default.ao_row_multi_commit')
+
+        data1 = pa.Table.from_pydict({
+            'user_id': [1, 2],
+            'item_id': [1001, 1002],
+            'behavior': ['buy', 'click'],
+            'dt': ['2024-01-01', '2024-01-01'],
+        }, schema=self.pa_schema)
+
+        data2 = pa.Table.from_pydict({
+            'user_id': [3, 4],
+            'item_id': [1003, 1004],
+            'behavior': ['view', 'buy'],
+            'dt': ['2024-01-02', '2024-01-02'],
+        }, schema=self.pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data1)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data2)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+        self.assertEqual(result.num_rows, 4)
+        self.assertEqual(sorted(result.column('user_id').to_pylist()), [1, 2, 
3, 4])
+
+    def test_append_only_with_nulls(self):
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema, options={'file.format': 'row'})
+        self.catalog.create_table('default.ao_row_nulls', schema, False)
+        table = self.catalog.get_table('default.ao_row_nulls')
+
+        data = pa.Table.from_pydict({
+            'user_id': [1, 2, 3],
+            'item_id': [None, 1002, None],
+            'behavior': ['buy', None, 'view'],
+            'dt': ['2024-01-01', '2024-01-01', '2024-01-02'],
+        }, schema=self.pa_schema)
+
+        result = self._write_and_read(table, data)
+        self.assertEqual(result.num_rows, 3)
+        item_ids = sorted(result.column('item_id').to_pylist(), key=lambda x: 
(x is None, x))
+        self.assertEqual(item_ids, [1002, None, None])
+
+    def test_append_only_column_projection(self):
+        """Test that reading with column projection decodes correctly."""
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema, options={'file.format': 'row'})
+        self.catalog.create_table('default.ao_row_projection', schema, False)
+        table = self.catalog.get_table('default.ao_row_projection')
+
+        data = pa.Table.from_pydict({
+            'user_id': [1, 2, 3],
+            'item_id': [1001, 1002, 1003],
+            'behavior': ['buy', 'click', 'view'],
+            'dt': ['2024-01-01', '2024-01-01', '2024-01-02'],
+        }, schema=self.pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Read only (user_id, behavior) - skipping item_id and dt
+        read_builder = table.new_read_builder()
+        read_builder = read_builder.with_projection(['user_id', 'behavior'])
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+        self.assertEqual(result.num_rows, 3)
+        self.assertEqual(result.schema.names, ['user_id', 'behavior'])
+        self.assertEqual(sorted(result.column('user_id').to_pylist()), [1, 2, 
3])
+        self.assertEqual(
+            sorted(result.column('behavior').to_pylist()),
+            ['buy', 'click', 'view'])
+
+        # Read only (item_id, dt) - skipping user_id and behavior
+        read_builder = table.new_read_builder()
+        read_builder = read_builder.with_projection(['item_id', 'dt'])
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+        self.assertEqual(result.num_rows, 3)
+        self.assertEqual(result.schema.names, ['item_id', 'dt'])
+        self.assertEqual(
+            sorted(result.column('item_id').to_pylist()), [1001, 1002, 1003])
+
+
+class RowFormatPrimaryKeyTest(unittest.TestCase):
+    """Test ROW format with primary-key tables."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+
+        cls.pa_schema = pa.schema([
+            pa.field('user_id', pa.int32(), nullable=False),
+            ('item_id', pa.int64()),
+            ('behavior', pa.string()),
+            pa.field('dt', pa.string(), nullable=False),
+        ])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def test_pk_table_basic(self):
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            partition_keys=['dt'],
+            primary_keys=['user_id', 'dt'],
+            options={'bucket': '1', 'file.format': 'row'})
+        self.catalog.create_table('default.pk_row_basic', schema, False)
+        table = self.catalog.get_table('default.pk_row_basic')
+
+        data = pa.Table.from_pydict({
+            'user_id': [1, 2, 3],
+            'item_id': [1001, 1002, 1003],
+            'behavior': ['buy', 'click', 'view'],
+            'dt': ['p1', 'p1', 'p2'],
+        }, schema=self.pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+        self.assertEqual(result.num_rows, 3)
+
+    def test_pk_table_upsert(self):
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            partition_keys=['dt'],
+            primary_keys=['user_id', 'dt'],
+            options={'bucket': '1', 'file.format': 'row'})
+        self.catalog.create_table('default.pk_row_upsert', schema, False)
+        table = self.catalog.get_table('default.pk_row_upsert')
+
+        write_builder = table.new_batch_write_builder()
+
+        # First commit
+        data1 = pa.Table.from_pydict({
+            'user_id': [1, 2, 3],
+            'item_id': [1001, 1002, 1003],
+            'behavior': ['buy', 'click', 'view'],
+            'dt': ['p1', 'p1', 'p1'],
+        }, schema=self.pa_schema)
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data1)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Second commit - update user_id=2
+        data2 = pa.Table.from_pydict({
+            'user_id': [2, 4],
+            'item_id': [1002, 1004],
+            'behavior': ['buy-updated', 'new'],
+            'dt': ['p1', 'p1'],
+        }, schema=self.pa_schema)
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data2)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+        self.assertEqual(result.num_rows, 4)
+        result_dict = result.sort_by('user_id').to_pydict()
+        self.assertEqual(result_dict['user_id'], [1, 2, 3, 4])
+        self.assertEqual(result_dict['behavior'], ['buy', 'buy-updated', 
'view', 'new'])
+
+
+class RowFormatDataEvolutionTest(unittest.TestCase):
+    """Test ROW format with data-evolution enabled tables."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+
+        cls.pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('age', pa.int32()),
+            ('city', pa.string()),
+        ])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def test_data_evolution_write_read(self):
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            options={
+                'file.format': 'row',
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+            })
+        self.catalog.create_table('default.de_row_basic', schema, False)
+        table = self.catalog.get_table('default.de_row_basic')
+
+        data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['alice', 'bob', 'charlie'],
+            'age': [25, 30, 35],
+            'city': ['beijing', 'shanghai', 'guangzhou'],
+        }, schema=self.pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+        self.assertEqual(result.num_rows, 3)
+        self.assertEqual(sorted(result.column('id').to_pylist()), [1, 2, 3])
+
+    def test_data_evolution_multiple_commits(self):
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            options={
+                'file.format': 'row',
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+            })
+        self.catalog.create_table('default.de_row_multi', schema, False)
+        table = self.catalog.get_table('default.de_row_multi')
+
+        write_builder = table.new_batch_write_builder()
+
+        data1 = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['alice', 'bob', 'charlie'],
+            'age': [25, 30, 35],
+            'city': ['beijing', 'shanghai', 'guangzhou'],
+        }, schema=self.pa_schema)
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data1)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        data2 = pa.Table.from_pydict({
+            'id': [4, 5],
+            'name': ['dave', 'eve'],
+            'age': [40, 45],
+            'city': ['shenzhen', 'hangzhou'],
+        }, schema=self.pa_schema)
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data2)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+        self.assertEqual(result.num_rows, 5)
+        self.assertEqual(sorted(result.column('id').to_pylist()), [1, 2, 3, 4, 
5])
+
+
+class RowFormatBlobTableTest(unittest.TestCase):
+    """Test ROW format with blob tables (blob columns stored separately)."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+
+        cls.pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('blob_data', pa.large_binary()),
+        ])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def test_blob_table_with_row_format(self):
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            options={
+                'file.format': 'row',
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+            })
+        self.catalog.create_table('default.blob_row_basic', schema, False)
+        table = self.catalog.get_table('default.blob_row_basic')
+
+        data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['a', 'b', 'c'],
+            'blob_data': [b'\x01\x02\x03', b'\x04\x05', b'\x06\x07\x08\x09'],
+        }, schema=self.pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+        self.assertEqual(result.num_rows, 3)
+        self.assertEqual(sorted(result.column('id').to_pylist()), [1, 2, 3])
+
+
+class RowFormatVectorTableTest(unittest.TestCase):
+    """Test ROW format with vector tables (vector columns inline)."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+
+        cls.pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('embed', pa.list_(pa.float32(), 3)),
+        ])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def test_vector_inline_with_row_format(self):
+        """Vector stored inline in the same ROW file."""
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            options={'file.format': 'row'})
+        self.catalog.create_table('default.vec_row_inline', schema, False)
+        table = self.catalog.get_table('default.vec_row_inline')
+
+        data = pa.table({
+            'id': pa.array([1, 2, 3], type=pa.int64()),
+            'embed': pa.FixedSizeListArray.from_arrays(
+                pa.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], 
type=pa.float32()),
+                3
+            ),
+        })
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+        self.assertEqual(result.num_rows, 3)
+        self.assertEqual(sorted(result.column('id').to_pylist()), [1, 2, 3])
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py 
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index 4c3289f5aa..cb131f9a54 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -359,6 +359,8 @@ class DataBlobWriter(DataWriter):
             self.file_io.write_lance(file_path, data)
         elif self.file_format == CoreOptions.FILE_FORMAT_VORTEX:
             self.file_io.write_vortex(file_path, data)
+        elif self.file_format == CoreOptions.FILE_FORMAT_ROW:
+            self.file_io.write_row(file_path, data, zstd_level=self.zstd_level)
         else:
             raise ValueError(f"Unsupported file format: {self.file_format}")
 
diff --git a/paimon-python/pypaimon/write/writer/data_vector_writer.py 
b/paimon-python/pypaimon/write/writer/data_vector_writer.py
index d2b0adb568..9de5e2a27a 100644
--- a/paimon-python/pypaimon/write/writer/data_vector_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_vector_writer.py
@@ -212,6 +212,8 @@ class DataVectorWriter(DataWriter):
             self.file_io.write_lance(file_path, data)
         elif self.file_format == CoreOptions.FILE_FORMAT_VORTEX:
             self.file_io.write_vortex(file_path, data)
+        elif self.file_format == CoreOptions.FILE_FORMAT_ROW:
+            self.file_io.write_row(file_path, data, zstd_level=self.zstd_level)
         else:
             raise ValueError(f"Unsupported file format: {self.file_format}")
 
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index f420a813c0..70c7594b08 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -198,6 +198,8 @@ class DataWriter(ABC):
             self.file_io.write_lance(file_path, data)
         elif self.file_format == CoreOptions.FILE_FORMAT_VORTEX:
             self.file_io.write_vortex(file_path, data)
+        elif self.file_format == CoreOptions.FILE_FORMAT_ROW:
+            self.file_io.write_row(file_path, data, zstd_level=self.zstd_level)
         else:
             raise ValueError(f"Unsupported file format: {self.file_format}")
 
diff --git a/paimon-python/pypaimon/write/writer/format_row_writer.py 
b/paimon-python/pypaimon/write/writer/format_row_writer.py
new file mode 100644
index 0000000000..f31c1eb58c
--- /dev/null
+++ b/paimon-python/pypaimon/write/writer/format_row_writer.py
@@ -0,0 +1,408 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import re
+import struct
+from decimal import Decimal
+from typing import Any, List
+
+import pyarrow as pa
+
+from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
+from pypaimon.schema.data_types import (
+    ArrayType, AtomicType, DataField, MapType, MultisetType, RowType, 
VectorType
+)
+
+FOOTER_SIZE = 32
+MAGIC = 0x524F5753  # "ROWS"
+VERSION = 1
+DEFAULT_BLOCK_SIZE = 65536
+
+
+class FormatRowWriter:
+
+    def __init__(self, output_stream, fields: List[DataField],
+                 block_size: int = DEFAULT_BLOCK_SIZE, zstd_level: int = 1):
+        self._out = output_stream
+        self._fields = fields
+        self._block_size = block_size
+        self._zstd_level = zstd_level
+
+        self._block_buf = _BlockBuffer()
+        self._row_offsets: List[int] = []
+
+        self._block_compressed_sizes: List[int] = []
+        self._block_uncompressed_sizes: List[int] = []
+        self._block_row_starts: List[int] = []
+        self._total_row_count = 0
+        self._bytes_written = 0
+
+    def write_table(self, data: pa.Table):
+        columns = {field.name: data.column(field.name).to_pylist()
+                   for field in self._fields if field.name in 
data.column_names}
+        num_rows = data.num_rows
+
+        for row_idx in range(num_rows):
+            row_values = [columns[field.name][row_idx] for field in 
self._fields]
+            self._write_row(row_values)
+            self._total_row_count += 1
+
+            if self._block_buf.position + len(self._row_offsets) * 4 + 4 >= 
self._block_size:
+                self._flush_block()
+
+    def close(self):
+        self._flush_block()
+
+        index_offset = self._bytes_written
+        self._write_block_index()
+        index_length = self._bytes_written - index_offset
+
+        self._write_footer(index_offset, index_length)
+        self._out.flush()
+
+    def _write_row(self, values: List[Any]):
+        self._row_offsets.append(self._block_buf.position)
+        arity = len(self._fields)
+        header_size = (arity + 7) // 8
+
+        header_start = self._block_buf.position
+        self._block_buf.write_zeros(header_size)
+
+        for i, field in enumerate(self._fields):
+            if values[i] is None:
+                byte_idx = header_start + i // 8
+                self._block_buf.buffer[byte_idx] |= (1 << (i % 8))
+            else:
+                _write_field(self._block_buf, values[i], field.type)
+
+    def _flush_block(self):
+        if not self._row_offsets:
+            return
+
+        import zstandard as zstd
+
+        self._block_row_starts.append(self._total_row_count - 
len(self._row_offsets))
+
+        for offset in self._row_offsets:
+            self._block_buf.write_int_le(offset)
+        self._block_buf.write_int_le(len(self._row_offsets))
+
+        uncompressed = bytes(self._block_buf.buffer[:self._block_buf.position])
+        self._block_uncompressed_sizes.append(len(uncompressed))
+
+        compressor = zstd.ZstdCompressor(level=self._zstd_level)
+        compressed = compressor.compress(uncompressed)
+        self._block_compressed_sizes.append(len(compressed))
+
+        self._out.write(compressed)
+        self._bytes_written += len(compressed)
+
+        self._block_buf.reset()
+        self._row_offsets.clear()
+
+    def _write_block_index(self):
+        encoded_compressed = 
DeltaVarintCompressor.compress(self._block_compressed_sizes)
+        encoded_uncompressed = 
DeltaVarintCompressor.compress(self._block_uncompressed_sizes)
+        encoded_row_starts = 
DeltaVarintCompressor.compress(self._block_row_starts)
+
+        self._write_varint_prefixed(encoded_compressed)
+        self._write_varint_prefixed(encoded_uncompressed)
+        self._write_varint_prefixed(encoded_row_starts)
+
+    def _write_varint_prefixed(self, data: bytes):
+        varint_bytes = _encode_var_int(len(data))
+        self._out.write(varint_bytes)
+        self._out.write(data)
+        self._bytes_written += len(varint_bytes) + len(data)
+
+    def _write_footer(self, index_offset: int, index_length: int):
+        buf = bytearray(FOOTER_SIZE)
+        struct.pack_into('<q', buf, 0, self._total_row_count)
+        struct.pack_into('<i', buf, 8, len(self._block_compressed_sizes))
+        struct.pack_into('<q', buf, 12, index_offset)
+        struct.pack_into('<i', buf, 20, index_length)
+        buf[24] = VERSION
+        # bytes 25-27 reserved (zeros)
+        struct.pack_into('<I', buf, 28, MAGIC)
+        self._out.write(bytes(buf))
+        self._bytes_written += FOOTER_SIZE
+
+
+class _BlockBuffer:
+
+    def __init__(self, initial_capacity: int = DEFAULT_BLOCK_SIZE):
+        self.buffer = bytearray(initial_capacity)
+        self.position = 0
+
+    def reset(self):
+        self.position = 0
+
+    def _ensure_capacity(self, additional: int):
+        required = self.position + additional
+        if required > len(self.buffer):
+            new_size = max(len(self.buffer) * 2, required)
+            new_buf = bytearray(new_size)
+            new_buf[:self.position] = self.buffer[:self.position]
+            self.buffer = new_buf
+
+    def write_zeros(self, count: int):
+        self._ensure_capacity(count)
+        for i in range(count):
+            self.buffer[self.position + i] = 0
+        self.position += count
+
+    def write_boolean(self, value: bool):
+        self._ensure_capacity(1)
+        self.buffer[self.position] = 1 if value else 0
+        self.position += 1
+
+    def write_byte(self, value: int):
+        self._ensure_capacity(1)
+        self.buffer[self.position] = value & 0xFF
+        self.position += 1
+
+    def write_short_le(self, value: int):
+        self._ensure_capacity(2)
+        struct.pack_into('<h', self.buffer, self.position, value)
+        self.position += 2
+
+    def write_int_le(self, value: int):
+        self._ensure_capacity(4)
+        struct.pack_into('<i', self.buffer, self.position, value)
+        self.position += 4
+
+    def write_long_le(self, value: int):
+        self._ensure_capacity(8)
+        struct.pack_into('<q', self.buffer, self.position, value)
+        self.position += 8
+
+    def write_float_le(self, value: float):
+        self._ensure_capacity(4)
+        struct.pack_into('<f', self.buffer, self.position, value)
+        self.position += 4
+
+    def write_double_le(self, value: float):
+        self._ensure_capacity(8)
+        struct.pack_into('<d', self.buffer, self.position, value)
+        self.position += 8
+
+    def write_var_int(self, value: int):
+        self._ensure_capacity(5)
+        while (value & ~0x7F) != 0:
+            self.buffer[self.position] = (value & 0x7F) | 0x80
+            self.position += 1
+            value >>= 7
+        self.buffer[self.position] = value & 0x7F
+        self.position += 1
+
+    def write_bytes_with_length(self, data: bytes):
+        self.write_var_int(len(data))
+        self._ensure_capacity(len(data))
+        self.buffer[self.position:self.position + len(data)] = data
+        self.position += len(data)
+
+    def write_string(self, value: str):
+        encoded = value.encode('utf-8')
+        self.write_bytes_with_length(encoded)
+
+
+def _encode_var_int(value: int) -> bytes:
+    result = bytearray()
+    while (value & ~0x7F) != 0:
+        result.append((value & 0x7F) | 0x80)
+        value >>= 7
+    result.append(value & 0x7F)
+    return bytes(result)
+
+
+def _write_field(buf: _BlockBuffer, value: Any, data_type) -> None:
+    if isinstance(data_type, AtomicType):
+        type_name = data_type.type.upper()
+        if type_name == 'BOOLEAN':
+            buf.write_boolean(value)
+        elif type_name == 'TINYINT':
+            buf.write_byte(value)
+        elif type_name == 'SMALLINT':
+            buf.write_short_le(value)
+        elif type_name in ('INT', 'INTEGER'):
+            buf.write_int_le(value)
+        elif type_name == 'DATE':
+            if isinstance(value, datetime.date):
+                epoch = datetime.date(1970, 1, 1)
+                days = (value - epoch).days
+                buf.write_int_le(days)
+            else:
+                buf.write_int_le(value)
+        elif type_name == 'TIME' or (type_name.startswith('TIME') and not 
type_name.startswith('TIMESTAMP')):
+            if isinstance(value, datetime.time):
+                millis = (value.hour * 3600 + value.minute * 60 + 
value.second) * 1000 + value.microsecond // 1000
+                buf.write_int_le(millis)
+            else:
+                buf.write_int_le(value)
+        elif type_name == 'BIGINT':
+            buf.write_long_le(value)
+        elif type_name == 'FLOAT':
+            buf.write_float_le(value)
+        elif type_name == 'DOUBLE':
+            buf.write_double_le(value)
+        elif type_name == 'STRING' or type_name.startswith('CHAR') or 
type_name.startswith('VARCHAR'):
+            buf.write_string(value)
+        elif type_name == 'BYTES' or type_name.startswith('BINARY') or 
type_name.startswith('VARBINARY'):
+            buf.write_bytes_with_length(value)
+        elif type_name == 'BLOB':
+            buf.write_bytes_with_length(value)
+        elif type_name.startswith('DECIMAL'):
+            precision, scale = _parse_decimal_params(type_name)
+            if precision <= 18:
+                if isinstance(value, Decimal):
+                    unscaled = int(value * (10 ** scale))
+                else:
+                    unscaled = int(Decimal(str(value)) * (10 ** scale))
+                buf.write_long_le(unscaled)
+            else:
+                if isinstance(value, Decimal):
+                    unscaled = int(value * (10 ** scale))
+                else:
+                    unscaled = int(Decimal(str(value)) * (10 ** scale))
+                raw = unscaled.to_bytes(
+                    (unscaled.bit_length() + 8) // 8, byteorder='big', 
signed=True)
+                buf.write_bytes_with_length(raw)
+        elif type_name.startswith('TIMESTAMP'):
+            precision = _parse_timestamp_precision(type_name)
+            if isinstance(value, datetime.datetime):
+                epoch = datetime.datetime(1970, 1, 1, tzinfo=value.tzinfo)
+                delta = value - epoch
+                total_micros = int(delta.total_seconds() * 1_000_000)
+                if precision <= 3:
+                    buf.write_long_le(total_micros // 1000)
+                else:
+                    millis = total_micros // 1000
+                    nano_of_milli = (total_micros % 1000) * 1000
+                    buf.write_long_le(millis)
+                    buf.write_var_int(nano_of_milli)
+            elif precision <= 3:
+                buf.write_long_le(value)
+            else:
+                if isinstance(value, int):
+                    millis = value // 1000
+                    nano_of_milli = (value % 1000) * 1000
+                else:
+                    millis = int(value) // 1000
+                    nano_of_milli = (int(value) % 1000) * 1000
+                buf.write_long_le(millis)
+                buf.write_var_int(nano_of_milli)
+        elif type_name == 'VARIANT':
+            if isinstance(value, dict):
+                buf.write_bytes_with_length(value['value'])
+                buf.write_bytes_with_length(value['metadata'])
+            else:
+                buf.write_bytes_with_length(value.value if hasattr(value, 
'value') else b'')
+                buf.write_bytes_with_length(value.metadata if hasattr(value, 
'metadata') else b'')
+        else:
+            raise ValueError(f"Unsupported atomic type for writing: 
{type_name}")
+
+    elif isinstance(data_type, ArrayType):
+        _write_array_elements(buf, value, data_type.element)
+
+    elif isinstance(data_type, VectorType):
+        _write_vector(buf, value, data_type.element)
+
+    elif isinstance(data_type, MapType):
+        if isinstance(value, dict):
+            keys = list(value.keys())
+            values = list(value.values())
+        else:
+            keys = [pair[0] for pair in value]
+            values = [pair[1] for pair in value]
+        _write_array_elements(buf, keys, data_type.key)
+        _write_array_elements(buf, values, data_type.value)
+
+    elif isinstance(data_type, MultisetType):
+        if isinstance(value, dict):
+            keys = list(value.keys())
+            counts = list(value.values())
+        else:
+            keys = [pair[0] for pair in value]
+            counts = [pair[1] for pair in value]
+        _write_array_elements(buf, keys, data_type.element)
+        _write_array_elements(buf, counts, AtomicType("INT"))
+
+    elif isinstance(data_type, RowType):
+        _write_nested_row(buf, value, data_type)
+
+    else:
+        raise ValueError(f"Unsupported data type for writing: {data_type}")
+
+
+def _write_array_elements(buf: _BlockBuffer, elements: list, element_type) -> 
None:
+    size = len(elements)
+    buf.write_var_int(size)
+    null_bitmap_bytes = (size + 7) // 8
+
+    null_start = buf.position
+    buf.write_zeros(null_bitmap_bytes)
+
+    for i, elem in enumerate(elements):
+        if elem is None:
+            buf.buffer[null_start + i // 8] |= (1 << (i % 8))
+        else:
+            _write_field(buf, elem, element_type)
+
+
+def _write_vector(buf: _BlockBuffer, elements: list, element_type) -> None:
+    size = len(elements)
+    buf.write_var_int(size)
+    for elem in elements:
+        _write_field(buf, elem, element_type)
+
+
+def _write_nested_row(buf: _BlockBuffer, value, row_type: RowType) -> None:
+    fields = row_type.fields
+    arity = len(fields)
+    header_size = (arity + 7) // 8
+
+    header_start = buf.position
+    buf.write_zeros(header_size)
+
+    for i, field in enumerate(fields):
+        if isinstance(value, dict):
+            field_value = value.get(field.name)
+        else:
+            field_value = value[i] if i < len(value) else None
+
+        if field_value is None:
+            buf.buffer[header_start + i // 8] |= (1 << (i % 8))
+        else:
+            _write_field(buf, field_value, field.type)
+
+
+def _parse_decimal_params(type_name: str) -> tuple:
+    match = re.fullmatch(r'DECIMAL\((\d+),\s*(\d+)\)', type_name)
+    if match:
+        return int(match.group(1)), int(match.group(2))
+    match = re.fullmatch(r'DECIMAL\((\d+)\)', type_name)
+    if match:
+        return int(match.group(1)), 0
+    return 10, 0
+
+
+def _parse_timestamp_precision(type_name: str) -> int:
+    match = re.search(r'\((\d+)\)', type_name)
+    if match:
+        return int(match.group(1))
+    return 6

Reply via email to