This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b413290ef4 [python] Add ROW file format read and write support (#8014)
b413290ef4 is described below
commit b413290ef40f2c41667b652112fd6cc283f44767
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu May 28 21:19:41 2026 +0800
[python] Add ROW file format read and write support (#8014)
Implement the ROW binary file format in the Python SDK, matching the
Java implementation for cross-language interoperability. The format uses
block-level ZSTD compression with delta-varint block indexing and a
32-byte footer.
---
.../test/java/org/apache/paimon/JavaPyE2ETest.java | 62 +++
.../apache/paimon/format/row/RowFormatWriter.java | 1 -
paimon-python/dev/run_mixed_tests.sh | 58 ++-
.../pypaimon/catalog/rest/rest_token_file_io.py | 3 +
paimon-python/pypaimon/common/file_io.py | 3 +
.../pypaimon/common/options/core_options.py | 1 +
.../pypaimon/filesystem/caching_file_io.py | 3 +
paimon-python/pypaimon/filesystem/local_file_io.py | 21 +
.../pypaimon/filesystem/pyarrow_file_io.py | 15 +
.../pypaimon/read/reader/format_row_reader.py | 469 ++++++++++++++++++
paimon-python/pypaimon/read/split_read.py | 28 +-
.../pypaimon/tests/e2e/java_py_read_write_test.py | 49 ++
.../tests/test_format_row_reader_writer.py | 534 +++++++++++++++++++++
.../pypaimon/tests/test_format_row_table.py | 503 +++++++++++++++++++
.../pypaimon/write/writer/data_blob_writer.py | 2 +
.../pypaimon/write/writer/data_vector_writer.py | 2 +
paimon-python/pypaimon/write/writer/data_writer.py | 2 +
.../pypaimon/write/writer/format_row_writer.py | 408 ++++++++++++++++
18 files changed, 2160 insertions(+), 4 deletions(-)
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index 86cea365c7..379331a1c2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -997,6 +997,68 @@ public class JavaPyE2ETest {
return GenericRow.ofKind(rowKind, values[0], values[1], values[2]);
}
+ /** Java writes a ROW-format append-only table for Python to read
(Java→Python E2E). */
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testJavaWriteRowAppendTable() throws Exception {
+ Identifier identifier = identifier("mixed_test_append_tablej_row");
+ catalog.dropTable(identifier, true);
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("value", DataTypes.DOUBLE())
+ .option("file.format", "row")
+ .option("bucket", "-1")
+ .build();
+
+ catalog.createTable(identifier, schema, false);
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(1, BinaryString.fromString("Apple"),
1.5));
+ write.write(GenericRow.of(2, BinaryString.fromString("Banana"),
0.8));
+ write.write(GenericRow.of(3, BinaryString.fromString("Carrot"),
0.6));
+ write.write(GenericRow.of(4, BinaryString.fromString("Broccoli"),
1.2));
+ write.write(GenericRow.of(5, BinaryString.fromString("Chicken"),
5.0));
+ write.write(GenericRow.of(6, BinaryString.fromString("Beef"),
8.0));
+ commit.commit(write.prepareCommit());
+ }
+
+ List<Split> splits = new
ArrayList<>(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newRead();
+ List<String> res =
+ getResult(
+ read,
+ splits,
+ row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
+ assertThat(res).hasSize(6);
+ LOG.info("testJavaWriteRowAppendTable: wrote and read back {}
ROW-format rows", res.size());
+ }
+
+ /** Java reads a ROW-format append-only table written by Python
(Python→Java E2E). */
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testReadRowAppendTable() throws Exception {
+ Identifier identifier = identifier("mixed_test_append_tablep_row");
+ Table table = catalog.getTable(identifier);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ List<Split> splits =
+ new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
+ TableRead read = fileStoreTable.newRead();
+ List<String> res =
+ getResult(
+ read,
+ splits,
+ row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
+ assertThat(res).hasSize(6);
+ LOG.info(
+ "testReadRowAppendTable: Java read {} ROW-format rows written
by Python",
+ res.size());
+ }
+
/** Java writes a VARIANT-column table for Python to read (Java→Python
E2E). */
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/row/RowFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/row/RowFormatWriter.java
index ff9ee1112b..7fcd750744 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/row/RowFormatWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/row/RowFormatWriter.java
@@ -89,7 +89,6 @@ public class RowFormatWriter implements FormatWriter {
footer.writeTo(out);
out.flush();
- out.close();
}
private void flushBlock() throws IOException {
diff --git a/paimon-python/dev/run_mixed_tests.sh
b/paimon-python/dev/run_mixed_tests.sh
index 7cc32eedee..6e7a7e00b5 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -612,6 +612,48 @@ run_py_variant_write_java_read_test() {
fi
}
+# Function to run ROW format test (Java write, Python read, Python write, Java
read)
+run_row_format_test() {
+ echo -e "${YELLOW}=== Running ROW Format Test (Java Write → Python Read,
Python Write → Java Read) ===${NC}"
+
+ cd "$PROJECT_ROOT"
+
+ echo "Running Maven test for JavaPyE2ETest.testJavaWriteRowAppendTable..."
+ if ! mvn test
-Dtest=org.apache.paimon.JavaPyE2ETest#testJavaWriteRowAppendTable -pl
paimon-core -q -Drun.e2e.tests=true; then
+ echo -e "${RED}✗ Java ROW write test failed${NC}"
+ return 1
+ fi
+ echo -e "${GREEN}✓ Java ROW write test completed successfully${NC}"
+
+ cd "$PAIMON_PYTHON_DIR"
+ echo "Running Python test for
JavaPyReadWriteTest.test_read_row_append_table..."
+ if ! python -m pytest
java_py_read_write_test.py::JavaPyReadWriteTest::test_read_row_append_table -v;
then
+ echo -e "${RED}✗ Python ROW read test failed${NC}"
+ return 1
+ fi
+ echo -e "${GREEN}✓ Python ROW read test completed successfully${NC}"
+
+ echo ""
+
+ echo "Running Python test for
JavaPyReadWriteTest.test_py_write_row_append_table..."
+ if ! python -m pytest
java_py_read_write_test.py::JavaPyReadWriteTest::test_py_write_row_append_table
-v; then
+ echo -e "${RED}✗ Python ROW write test failed${NC}"
+ return 1
+ fi
+ echo -e "${GREEN}✓ Python ROW write test completed successfully${NC}"
+
+ echo ""
+
+ cd "$PROJECT_ROOT"
+ echo "Running Maven test for JavaPyE2ETest.testReadRowAppendTable..."
+ if ! mvn test
-Dtest=org.apache.paimon.JavaPyE2ETest#testReadRowAppendTable -pl paimon-core
-q -Drun.e2e.tests=true; then
+ echo -e "${RED}✗ Java ROW read test failed${NC}"
+ return 1
+ fi
+ echo -e "${GREEN}✓ Java ROW read test completed successfully${NC}"
+ return 0
+}
+
# Main execution
main() {
local java_write_result=0
@@ -635,6 +677,7 @@ main() {
local multi_vector_dedicated_py_write_result=0
local java_variant_write_py_read_result=0
local py_variant_write_java_read_result=0
+ local row_format_result=0
# Detect Python version
PYTHON_VERSION=$(python -c "import sys;
print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null ||
echo "unknown")
@@ -815,6 +858,13 @@ main() {
echo ""
+ # Run ROW format test (Java write + Python read + Python write + Java read)
+ if ! run_row_format_test; then
+ row_format_result=1
+ fi
+
+ echo ""
+
echo -e "${YELLOW}=== Test Results Summary ===${NC}"
if [[ $java_write_result -eq 0 ]]; then
@@ -943,12 +993,18 @@ main() {
echo -e "${RED}✗ VARIANT Type Test (Python Write, Java Read):
FAILED${NC}"
fi
+ if [[ $row_format_result -eq 0 ]]; then
+ echo -e "${GREEN}✓ ROW Format Test (Java Write ↔ Python Read/Write):
PASSED${NC}"
+ else
+ echo -e "${RED}✗ ROW Format Test (Java Write ↔ Python Read/Write):
FAILED${NC}"
+ fi
+
echo ""
# Clean up warehouse directory after all tests
cleanup_warehouse
- if [[ $java_write_result -eq 0 && $python_read_result -eq 0 &&
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 &&
$btree_index_result -eq 0 && $compressed_text_result -eq 0 &&
$tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 &&
$lumina_vector_btree_result -eq 0 && $compact_conflict_result -eq 0 &&
$blob_alter_compact_result -eq 0 && $data_evolution_result -eq 0 &&
$data_evolution_py_write_result -eq 0 && $java_variant_write_py_read_result -eq
[...]
+ if [[ $java_write_result -eq 0 && $python_read_result -eq 0 &&
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 &&
$btree_index_result -eq 0 && $compressed_text_result -eq 0 &&
$tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 &&
$lumina_vector_btree_result -eq 0 && $compact_conflict_result -eq 0 &&
$blob_alter_compact_result -eq 0 && $data_evolution_result -eq 0 &&
$data_evolution_py_write_result -eq 0 && $java_variant_write_py_read_result -eq
[...]
echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability
verified.${NC}"
return 0
else
diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index 777dd6fef1..42dabb268c 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -191,6 +191,9 @@ class RESTTokenFileIO(FileIO):
def write_blob(self, path: str, data, **kwargs):
return self.file_io().write_blob(path, data, **kwargs)
+ def write_row(self, path: str, data, fields=None, zstd_level: int = 1,
**kwargs):
+ return self.file_io().write_row(path, data, fields, zstd_level,
**kwargs)
+
@property
def uri_reader_factory(self):
if self._uri_reader_factory_cache is None:
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index 6f9758965b..172ee2a66e 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -257,6 +257,9 @@ class FileIO(ABC):
def write_vortex(self, path: str, data, **kwargs):
raise NotImplementedError("write_vortex must be implemented by FileIO
subclasses")
+ def write_row(self, path: str, data, fields=None, zstd_level: int = 1,
**kwargs):
+ raise NotImplementedError("write_row must be implemented by FileIO
subclasses")
+
def close(self):
pass
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index b2f0e01916..1cc33f6df0 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -64,6 +64,7 @@ class CoreOptions:
FILE_FORMAT_BLOB: str = "blob"
FILE_FORMAT_LANCE: str = "lance"
FILE_FORMAT_VORTEX: str = "vortex"
+ FILE_FORMAT_ROW: str = "row"
# Basic options
AUTO_CREATE: ConfigOption[bool] = (
diff --git a/paimon-python/pypaimon/filesystem/caching_file_io.py
b/paimon-python/pypaimon/filesystem/caching_file_io.py
index 778614c56c..10605be5f9 100644
--- a/paimon-python/pypaimon/filesystem/caching_file_io.py
+++ b/paimon-python/pypaimon/filesystem/caching_file_io.py
@@ -414,6 +414,9 @@ class CachingFileIO(FileIO):
def write_vortex(self, *args, **kwargs):
return self._delegate.write_vortex(*args, **kwargs)
+ def write_row(self, *args, **kwargs):
+ return self._delegate.write_row(*args, **kwargs)
+
def __getattr__(self, name):
return getattr(self._delegate, name)
diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py
b/paimon-python/pypaimon/filesystem/local_file_io.py
index 120f2c6b3a..8385790885 100644
--- a/paimon-python/pypaimon/filesystem/local_file_io.py
+++ b/paimon-python/pypaimon/filesystem/local_file_io.py
@@ -412,6 +412,27 @@ class LocalFileIO(FileIO):
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Vortex file {path}: {e}")
from e
+ def write_row(self, path: str, data: pyarrow.Table, fields=None,
zstd_level: int = 1, **kwargs):
+ try:
+ from pypaimon.write.writer.format_row_writer import FormatRowWriter
+ from pypaimon.schema.data_types import PyarrowFieldParser
+
+ if fields is None:
+ fields = PyarrowFieldParser.to_paimon_schema(data.schema)
+
+ file_path = self._to_file(path)
+ parent = file_path.parent
+ if parent and not parent.exists():
+ parent.mkdir(parents=True, exist_ok=True)
+
+ with open(file_path, 'wb') as output_stream:
+ writer = FormatRowWriter(output_stream, fields,
zstd_level=zstd_level)
+ writer.write_table(data)
+ writer.close()
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write row file {path}: {e}") from e
+
def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
try:
if data.num_columns != 1:
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index e7315f3eff..74d43579a2 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -673,6 +673,21 @@ class PyArrowFileIO(FileIO):
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Vortex file {path}: {e}")
from e
+ def write_row(self, path: str, data: pyarrow.Table, fields=None,
zstd_level: int = 1, **kwargs):
+ try:
+ from pypaimon.write.writer.format_row_writer import FormatRowWriter
+
+ if fields is None:
+ fields = PyarrowFieldParser.to_paimon_schema(data.schema)
+
+ with self.new_output_stream(path) as output_stream:
+ writer = FormatRowWriter(output_stream, fields,
zstd_level=zstd_level)
+ writer.write_table(data)
+ writer.close()
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write row file {path}: {e}") from e
+
def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
try:
if data.num_columns != 1:
diff --git a/paimon-python/pypaimon/read/reader/format_row_reader.py
b/paimon-python/pypaimon/read/reader/format_row_reader.py
new file mode 100644
index 0000000000..34a9c663a9
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/format_row_reader.py
@@ -0,0 +1,469 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import struct
+from decimal import Decimal
+from typing import Any, List, Optional
+
+import pyarrow as pa
+import pyarrow.dataset as ds
+from pyarrow import RecordBatch
+
+from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
+from pypaimon.common.file_io import FileIO
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.schema.data_types import (
+ ArrayType, DataField, MapType, MultisetType, PyarrowFieldParser, RowType,
VectorType, AtomicType
+)
+
+FOOTER_SIZE = 32
+MAGIC = 0x524F5753 # "ROWS"
+VERSION = 1
+
+
+class FormatRowReader(RecordBatchReader):
+
+ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
+ full_fields: List[DataField], push_down_predicate: Any,
+ batch_size: int = 1024, row_indices: Optional[List[int]] =
None):
+ self._file_io = file_io
+ self._file_path = file_path
+ self._push_down_predicate = push_down_predicate
+ self._batch_size = batch_size
+
+ self._file_size = file_io.get_file_size(file_path)
+
+ full_fields_map = {field.name: field for field in full_fields}
+ self._projected_fields = [full_fields_map[name] for name in
read_fields]
+ self._all_fields = full_fields
+ self._schema =
PyarrowFieldParser.from_paimon_schema(self._projected_fields)
+
+ self._block_compressed_sizes: List[int] = []
+ self._block_uncompressed_sizes: List[int] = []
+ self._block_row_starts: List[int] = []
+ self._block_offsets: List[int] = []
+ self._total_row_count = 0
+ self._block_count = 0
+ self._current_block_idx = 0
+
+ self._row_indices = row_indices
+ self._row_indices_pos = 0
+
+ self._read_metadata()
+
+ if self._row_indices is not None:
+ self._blocks_to_read = self._compute_blocks_for_indices()
+ self._blocks_to_read_pos = 0
+ else:
+ self._blocks_to_read = None
+
+ def _compute_blocks_for_indices(self) -> List[tuple]:
+ """Group row_indices by block. Returns list of (block_idx,
[local_row_offsets])."""
+ import bisect
+ result = []
+ row_starts = self._block_row_starts
+ for idx in self._row_indices:
+ block_idx = bisect.bisect_right(row_starts, idx) - 1
+ local_row = idx - row_starts[block_idx]
+ if result and result[-1][0] == block_idx:
+ result[-1][1].append(local_row)
+ else:
+ result.append((block_idx, [local_row]))
+ return result
+
+ def read_arrow_batch(self) -> Optional[RecordBatch]:
+ if self._row_indices is not None:
+ return self._read_batch_indexed()
+ return self._read_batch_sequential()
+
+ def _read_batch_sequential(self) -> Optional[RecordBatch]:
+ if self._current_block_idx >= self._block_count:
+ return None
+
+ block_data = self._read_and_decompress_block(self._current_block_idx)
+ self._current_block_idx += 1
+
+ columns = self._decode_block(block_data)
+
+ if not columns or all(len(col) == 0 for col in columns):
+ return None
+
+ pydict = {field.name: columns[i] for i, field in
enumerate(self._projected_fields)}
+ table = pa.Table.from_pydict(pydict, self._schema)
+
+ if self._push_down_predicate is not None:
+ dataset = ds.InMemoryDataset(table)
+ scanner = dataset.scanner(filter=self._push_down_predicate)
+ table = scanner.to_table().combine_chunks()
+
+ if table.num_rows == 0:
+ return self._read_batch_sequential()
+
+ return table.to_batches()[0]
+
+ def _read_batch_indexed(self) -> Optional[RecordBatch]:
+ if self._blocks_to_read_pos >= len(self._blocks_to_read):
+ return None
+
+ block_idx, local_rows = self._blocks_to_read[self._blocks_to_read_pos]
+ self._blocks_to_read_pos += 1
+
+ block_data = self._read_and_decompress_block(block_idx)
+ columns = self._decode_block(block_data, row_filter=local_rows)
+
+ if not columns or all(len(col) == 0 for col in columns):
+ return self._read_batch_indexed()
+
+ pydict = {field.name: columns[i] for i, field in
enumerate(self._projected_fields)}
+ table = pa.Table.from_pydict(pydict, self._schema)
+
+ if self._push_down_predicate is not None:
+ dataset = ds.InMemoryDataset(table)
+ scanner = dataset.scanner(filter=self._push_down_predicate)
+ table = scanner.to_table().combine_chunks()
+
+ if table.num_rows == 0:
+ return self._read_batch_indexed()
+
+ return table.to_batches()[0]
+
+ def close(self):
+ pass
+
+ def _read_metadata(self):
+ with self._file_io.new_input_stream(self._file_path) as f:
+ f.seek(self._file_size - FOOTER_SIZE)
+ footer_bytes = f.read(FOOTER_SIZE)
+
+ if len(footer_bytes) != FOOTER_SIZE:
+ raise IOError("Invalid row file: cannot read footer")
+
+ magic = struct.unpack_from('<I', footer_bytes, 28)[0]
+ if magic != MAGIC:
+ raise IOError(f"Invalid row file magic: expected 0x{MAGIC:08X},
got 0x{magic:08X}")
+
+ version = footer_bytes[24]
+ if version != VERSION:
+ raise IOError(f"Unsupported row file version: {version}")
+
+ self._total_row_count = struct.unpack_from('<q', footer_bytes, 0)[0]
+ self._block_count = struct.unpack_from('<i', footer_bytes, 8)[0]
+ index_offset = struct.unpack_from('<q', footer_bytes, 12)[0]
+ index_length = struct.unpack_from('<i', footer_bytes, 20)[0]
+
+ with self._file_io.new_input_stream(self._file_path) as f:
+ f.seek(index_offset)
+ index_bytes = f.read(index_length)
+
+ if len(index_bytes) != index_length:
+ raise IOError("Invalid row file: cannot read block index")
+
+ self._parse_block_index(index_bytes)
+
+ def _parse_block_index(self, index_data: bytes):
+ pos = 0
+
+ len1, consumed = _decode_var_int(index_data, pos)
+ pos += consumed
+ self._block_compressed_sizes =
DeltaVarintCompressor.decompress(index_data[pos:pos + len1])
+ pos += len1
+
+ len2, consumed = _decode_var_int(index_data, pos)
+ pos += consumed
+ self._block_uncompressed_sizes =
DeltaVarintCompressor.decompress(index_data[pos:pos + len2])
+ pos += len2
+
+ len3, consumed = _decode_var_int(index_data, pos)
+ pos += consumed
+ self._block_row_starts =
DeltaVarintCompressor.decompress(index_data[pos:pos + len3])
+
+ offset = 0
+ self._block_offsets = []
+ for size in self._block_compressed_sizes:
+ self._block_offsets.append(offset)
+ offset += size
+
+ def _read_and_decompress_block(self, block_idx: int) -> bytes:
+ import zstandard as zstd
+
+ offset = self._block_offsets[block_idx]
+ compressed_size = self._block_compressed_sizes[block_idx]
+
+ with self._file_io.new_input_stream(self._file_path) as f:
+ f.seek(offset)
+ compressed_data = f.read(compressed_size)
+
+ decompressor = zstd.ZstdDecompressor()
+ uncompressed_size = self._block_uncompressed_sizes[block_idx]
+ return decompressor.decompress(compressed_data,
max_output_size=uncompressed_size)
+
+ def _decode_block(self, block_data: bytes,
+ row_filter: Optional[List[int]] = None) -> List[List]:
+ data_len = len(block_data)
+ row_count = struct.unpack_from('<i', block_data, data_len - 4)[0]
+ offset_array_start = data_len - 4 - row_count * 4
+
+ projected_indices = []
+ for pf in self._projected_fields:
+ for i, af in enumerate(self._all_fields):
+ if af.name == pf.name:
+ projected_indices.append(i)
+ break
+
+ columns = [[] for _ in self._projected_fields]
+ arity = len(self._all_fields)
+ header_size = (arity + 7) // 8
+
+ rows_to_read = row_filter if row_filter is not None else
range(row_count)
+ proj_set = set(projected_indices)
+ proj_col_map = {field_idx: col_idx for col_idx, field_idx in
enumerate(projected_indices)}
+
+ for row_idx in rows_to_read:
+ row_offset = struct.unpack_from('<i', block_data,
offset_array_start + row_idx * 4)[0]
+ decoder = _RowDecoder(block_data, row_offset)
+
+ null_bitmap = block_data[decoder.pos:decoder.pos + header_size]
+ decoder.pos += header_size
+
+ for field_idx in range(arity):
+ is_null = (null_bitmap[field_idx // 8] & (1 << (field_idx %
8))) != 0
+ if is_null:
+ if field_idx in proj_set:
+ columns[proj_col_map[field_idx]].append(None)
+ else:
+ value = _read_field(decoder,
self._all_fields[field_idx].type)
+ if field_idx in proj_set:
+ columns[proj_col_map[field_idx]].append(value)
+
+ return columns
+
+
+class _RowDecoder:
+
+ __slots__ = ('data', 'pos')
+
+ def __init__(self, data: bytes, pos: int = 0):
+ self.data = data
+ self.pos = pos
+
+ def read_boolean(self) -> bool:
+ v = self.data[self.pos] != 0
+ self.pos += 1
+ return v
+
+ def read_byte(self) -> int:
+ v = struct.unpack_from('<b', self.data, self.pos)[0]
+ self.pos += 1
+ return v
+
+ def read_short(self) -> int:
+ v = struct.unpack_from('<h', self.data, self.pos)[0]
+ self.pos += 2
+ return v
+
+ def read_int(self) -> int:
+ v = struct.unpack_from('<i', self.data, self.pos)[0]
+ self.pos += 4
+ return v
+
+ def read_long(self) -> int:
+ v = struct.unpack_from('<q', self.data, self.pos)[0]
+ self.pos += 8
+ return v
+
+ def read_float(self) -> float:
+ v = struct.unpack_from('<f', self.data, self.pos)[0]
+ self.pos += 4
+ return v
+
+ def read_double(self) -> float:
+ v = struct.unpack_from('<d', self.data, self.pos)[0]
+ self.pos += 8
+ return v
+
+ def read_var_int(self) -> int:
+ result = 0
+ shift = 0
+ while True:
+ b = self.data[self.pos]
+ self.pos += 1
+ result |= (b & 0x7F) << shift
+ if (b & 0x80) == 0:
+ return result
+ shift += 7
+
+ def read_string(self) -> str:
+ length = self.read_var_int()
+ s = self.data[self.pos:self.pos + length].decode('utf-8')
+ self.pos += length
+ return s
+
+ def read_bytes(self) -> bytes:
+ length = self.read_var_int()
+ b = self.data[self.pos:self.pos + length]
+ self.pos += length
+ return bytes(b)
+
+
+def _decode_var_int(data: bytes, offset: int) -> tuple:
+ result = 0
+ shift = 0
+ pos = offset
+ while True:
+ b = data[pos]
+ pos += 1
+ result |= (b & 0x7F) << shift
+ if (b & 0x80) == 0:
+ return result, pos - offset
+ shift += 7
+
+
+def _read_field(decoder: _RowDecoder, data_type) -> Any:
+ if isinstance(data_type, AtomicType):
+ type_name = data_type.type.upper()
+ if type_name == 'BOOLEAN':
+ return decoder.read_boolean()
+ elif type_name == 'TINYINT':
+ return decoder.read_byte()
+ elif type_name == 'SMALLINT':
+ return decoder.read_short()
+ elif type_name in ('INT', 'INTEGER', 'DATE', 'TIME'):
+ return decoder.read_int()
+ elif type_name.startswith('TIME') and not
type_name.startswith('TIMESTAMP'):
+ return decoder.read_int()
+ elif type_name == 'BIGINT':
+ return decoder.read_long()
+ elif type_name == 'FLOAT':
+ return decoder.read_float()
+ elif type_name == 'DOUBLE':
+ return decoder.read_double()
+ elif type_name == 'STRING' or type_name.startswith('CHAR') or
type_name.startswith('VARCHAR'):
+ return decoder.read_string()
+ elif type_name == 'BYTES' or type_name.startswith('BINARY') or
type_name.startswith('VARBINARY'):
+ return decoder.read_bytes()
+ elif type_name == 'BLOB':
+ return decoder.read_bytes()
+ elif type_name.startswith('DECIMAL'):
+ precision, scale = _parse_decimal_params(type_name)
+ if precision <= 18:
+ unscaled = decoder.read_long()
+ return Decimal(unscaled) / Decimal(10 ** scale)
+ else:
+ raw = decoder.read_bytes()
+ unscaled = int.from_bytes(raw, byteorder='big', signed=True)
+ return Decimal(unscaled) / Decimal(10 ** scale)
+ elif type_name.startswith('TIMESTAMP'):
+ precision = _parse_timestamp_precision(type_name)
+ millis = decoder.read_long()
+ if precision <= 3:
+ return millis
+ else:
+ nano_of_milli = decoder.read_var_int()
+ micros = millis * 1000 + nano_of_milli // 1000
+ return micros
+ elif type_name == 'VARIANT':
+ value_bytes = decoder.read_bytes()
+ metadata_bytes = decoder.read_bytes()
+ return {'value': value_bytes, 'metadata': metadata_bytes}
+ else:
+ raise ValueError(f"Unsupported atomic type: {type_name}")
+
+ elif isinstance(data_type, ArrayType):
+ return _read_array(decoder, data_type.element)
+
+ elif isinstance(data_type, VectorType):
+ return _read_vector(decoder, data_type.element)
+
+ elif isinstance(data_type, MapType):
+ keys = _read_array_elements(decoder, data_type.key)
+ values = _read_array_elements(decoder, data_type.value)
+ return list(zip(keys, values))
+
+ elif isinstance(data_type, MultisetType):
+ keys = _read_array_elements(decoder, data_type.element)
+ counts = _read_array_elements(decoder, AtomicType("INT"))
+ return list(zip(keys, counts))
+
+ elif isinstance(data_type, RowType):
+ return _read_nested_row(decoder, data_type)
+
+ else:
+ raise ValueError(f"Unsupported data type: {data_type}")
+
+
+def _read_array(decoder: _RowDecoder, element_type) -> list:
+ return _read_array_elements(decoder, element_type)
+
+
+def _read_array_elements(decoder: _RowDecoder, element_type) -> list:
+ size = decoder.read_var_int()
+ null_bitmap_bytes = (size + 7) // 8
+ null_bitmap = decoder.data[decoder.pos:decoder.pos + null_bitmap_bytes]
+ decoder.pos += null_bitmap_bytes
+
+ elements = []
+ for i in range(size):
+ is_null = (null_bitmap[i // 8] & (1 << (i % 8))) != 0
+ if is_null:
+ elements.append(None)
+ else:
+ elements.append(_read_field(decoder, element_type))
+ return elements
+
+
+def _read_vector(decoder: _RowDecoder, element_type) -> list:
+ size = decoder.read_var_int()
+ elements = []
+ for _ in range(size):
+ elements.append(_read_field(decoder, element_type))
+ return elements
+
+
+def _read_nested_row(decoder: _RowDecoder, row_type: RowType) -> dict:
+ fields = row_type.fields
+ arity = len(fields)
+ header_size = (arity + 7) // 8
+ null_bitmap = decoder.data[decoder.pos:decoder.pos + header_size]
+ decoder.pos += header_size
+
+ result = {}
+ for i, field in enumerate(fields):
+ is_null = (null_bitmap[i // 8] & (1 << (i % 8))) != 0
+ if is_null:
+ result[field.name] = None
+ else:
+ result[field.name] = _read_field(decoder, field.type)
+ return result
+
+
+def _parse_decimal_params(type_name: str) -> tuple:
+ import re
+ match = re.fullmatch(r'DECIMAL\((\d+),\s*(\d+)\)', type_name)
+ if match:
+ return int(match.group(1)), int(match.group(2))
+ match = re.fullmatch(r'DECIMAL\((\d+)\)', type_name)
+ if match:
+ return int(match.group(1)), 0
+ return 10, 0
+
+
+def _parse_timestamp_precision(type_name: str) -> int:
+ import re
+ match = re.search(r'\((\d+)\)', type_name)
+ if match:
+ return int(match.group(1))
+ return 6
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index e432eca6b1..351f8de83e 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -45,6 +45,7 @@ from pypaimon.read.reader.row_range_filter_record_reader
import RowIdFilterRecor
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
from pypaimon.read.reader.format_lance_reader import FormatLanceReader
from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
+from pypaimon.read.reader.format_row_reader import FormatRowReader
from pypaimon.read.reader.format_vortex_reader import FormatVortexReader
from pypaimon.read.reader.iface.record_batch_reader import (RecordBatchReader,
RowPositionReader,
EmptyRecordBatchReader)
@@ -195,7 +196,10 @@ class SplitRead(ABC):
effective_row_ranges = Range.and_(row_ranges,
[file.row_id_range()])
if len(effective_row_ranges) == 0:
return EmptyRecordBatchReader()
- if file_format in (CoreOptions.FILE_FORMAT_VORTEX,
CoreOptions.FILE_FORMAT_LANCE):
+ row_index_formats = (CoreOptions.FILE_FORMAT_VORTEX,
+ CoreOptions.FILE_FORMAT_LANCE,
+ CoreOptions.FILE_FORMAT_ROW)
+ if file_format in row_index_formats:
row_indices = []
for r in effective_row_ranges:
start = r.from_ - file.first_row_id
@@ -268,10 +272,30 @@ class SplitRead(ABC):
ordered_read_fields, read_arrow_predicate,
batch_size=batch_size,
options=self.table.options,
nested_name_paths=ordered_nested_paths)
+ elif file_format == CoreOptions.FILE_FORMAT_ROW:
+ if has_nested:
+ raise NotImplementedError(
+ "Nested-field projection is not supported on ROW files")
+ file_schema = self.table.schema_manager.get_schema(
+ file.schema_id)
+ if file.write_cols:
+ field_map = {f.name: f for f in file_schema.fields}
+ row_full_fields = [field_map[n] for n in file.write_cols
+ if n in field_map]
+ elif self.table.is_primary_key_table:
+ row_full_fields = self._create_key_value_fields(
+ file_schema.fields)
+ else:
+ row_full_fields = file_schema.fields
+ format_reader = FormatRowReader(
+ self.table.file_io, file_path, read_file_fields,
+ row_full_fields,
+ read_arrow_predicate, batch_size=batch_size,
+ row_indices=row_indices)
elif file_format in ('json', 'csv'):
raise NotImplementedError(
f"Reading '{file_format}' format is not yet supported in
Python SDK. "
- f"Supported formats: parquet, orc, avro, lance, blob.")
+ f"Supported formats: parquet, orc, avro, lance, blob, row.")
else:
raise ValueError(f"Unexpected file format: {file_format}")
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 475d2e74f7..d01adb26ba 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -277,6 +277,55 @@ class JavaPyReadWriteTest(unittest.TestCase):
# which explicitly reads KeyValue objects and checks valueKind
print(f"Format: {file_format}, Python read completed. ValueKind
verification should be done in Java test.")
+ def test_py_write_row_append_table(self):
+ """Python writes a ROW-format append-only table for Java to read."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('value', pa.float64()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={'file.format': 'row', 'bucket': '-1'}
+ )
+
+ table_name = 'default.mixed_test_append_tablep_row'
+ self.catalog.create_table(table_name, schema, False)
+ table = self.catalog.get_table(table_name)
+
+ data = pa.table({
+ 'id': pa.array([1, 2, 3, 4, 5, 6], type=pa.int32()),
+ 'name': pa.array(['Apple', 'Banana', 'Carrot', 'Broccoli',
'Chicken', 'Beef']),
+ 'value': pa.array([1.5, 0.8, 0.6, 1.2, 5.0, 8.0]),
+ })
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Verify Python can read it back
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+ self.assertEqual(result.num_rows, 6)
+ expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken',
'Beef'}
+ self.assertEqual(set(result.column('name').to_pylist()),
expected_names)
+
+ def test_read_row_append_table(self):
+ """Python reads a ROW-format append-only table written by Java."""
+ table = self.catalog.get_table('default.mixed_test_append_tablej_row')
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+ self.assertEqual(result.num_rows, 6)
+ expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken',
'Beef'}
+ self.assertEqual(set(result.column('name').to_pylist()),
expected_names)
+
def test_pk_dv_read(self):
pa_schema = pa.schema([
pa.field('pt', pa.int32(), nullable=False),
diff --git a/paimon-python/pypaimon/tests/test_format_row_reader_writer.py
b/paimon-python/pypaimon/tests/test_format_row_reader_writer.py
new file mode 100644
index 0000000000..cff237b5be
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_format_row_reader_writer.py
@@ -0,0 +1,534 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import tempfile
+from decimal import Decimal
+
+import pyarrow as pa
+import pytest
+
+from pypaimon.read.reader.format_row_reader import FormatRowReader
+from pypaimon.schema.data_types import (
+ ArrayType, AtomicType, DataField, MapType, RowType
+)
+from pypaimon.write.writer.format_row_writer import FormatRowWriter
+
+
+class SimpleFileIO:
+ """Minimal FileIO for testing."""
+
+ def get_file_size(self, path):
+ return os.path.getsize(path)
+
+ def new_input_stream(self, path):
+ return open(path, 'rb')
+
+
+def _write_row_file(path, fields, data_table):
+ with open(path, 'wb') as f:
+ writer = FormatRowWriter(f, fields)
+ writer.write_table(data_table)
+ writer.close()
+
+
+def _read_row_file(path, fields, read_field_names=None, row_indices=None):
+ file_io = SimpleFileIO()
+ if read_field_names is None:
+ read_field_names = [f.name for f in fields]
+ reader = FormatRowReader(file_io, path, read_field_names, fields, None,
+ row_indices=row_indices)
+ batches = []
+ while True:
+ batch = reader.read_arrow_batch()
+ if batch is None:
+ break
+ batches.append(batch)
+ reader.close()
+ if not batches:
+ return pa.table({f.name: [] for f in fields})
+ return pa.Table.from_batches(batches)
+
+
+class TestFormatRowReaderWriter:
+
+ def test_basic_int_string(self):
+ fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "name", AtomicType("STRING")),
+ ]
+ data = pa.table({
+ "id": pa.array([1, 2, 3], type=pa.int32()),
+ "name": pa.array(["alice", "bob", "charlie"], type=pa.string()),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ _write_row_file(path, fields, data)
+ result = _read_row_file(path, fields)
+ assert result.column("id").to_pylist() == [1, 2, 3]
+ assert result.column("name").to_pylist() == ["alice", "bob",
"charlie"]
+ finally:
+ os.unlink(path)
+
+ def test_all_primitive_types(self):
+ fields = [
+ DataField(0, "bool_col", AtomicType("BOOLEAN")),
+ DataField(1, "tinyint_col", AtomicType("TINYINT")),
+ DataField(2, "smallint_col", AtomicType("SMALLINT")),
+ DataField(3, "int_col", AtomicType("INT")),
+ DataField(4, "bigint_col", AtomicType("BIGINT")),
+ DataField(5, "float_col", AtomicType("FLOAT")),
+ DataField(6, "double_col", AtomicType("DOUBLE")),
+ DataField(7, "string_col", AtomicType("STRING")),
+ DataField(8, "binary_col", AtomicType("BYTES")),
+ ]
+ data = pa.table({
+ "bool_col": pa.array([True, False], type=pa.bool_()),
+ "tinyint_col": pa.array([1, -1], type=pa.int8()),
+ "smallint_col": pa.array([100, -100], type=pa.int16()),
+ "int_col": pa.array([1000, -1000], type=pa.int32()),
+ "bigint_col": pa.array([100000, -100000], type=pa.int64()),
+ "float_col": pa.array([1.5, -2.5], type=pa.float32()),
+ "double_col": pa.array([3.14, -2.71], type=pa.float64()),
+ "string_col": pa.array(["hello", "world"], type=pa.string()),
+ "binary_col": pa.array([b"\x01\x02", b"\x03\x04"],
type=pa.binary()),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ _write_row_file(path, fields, data)
+ result = _read_row_file(path, fields)
+ assert result.column("bool_col").to_pylist() == [True, False]
+ assert result.column("tinyint_col").to_pylist() == [1, -1]
+ assert result.column("smallint_col").to_pylist() == [100, -100]
+ assert result.column("int_col").to_pylist() == [1000, -1000]
+ assert result.column("bigint_col").to_pylist() == [100000, -100000]
+ assert result.column("float_col").to_pylist()[0] ==
pytest.approx(1.5)
+ assert result.column("float_col").to_pylist()[1] ==
pytest.approx(-2.5)
+ assert result.column("double_col").to_pylist() ==
[pytest.approx(3.14), pytest.approx(-2.71)]
+ assert result.column("string_col").to_pylist() == ["hello",
"world"]
+ assert result.column("binary_col").to_pylist() == [b"\x01\x02",
b"\x03\x04"]
+ finally:
+ os.unlink(path)
+
+ def test_nulls(self):
+ fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "name", AtomicType("STRING")),
+ ]
+ data = pa.table({
+ "id": pa.array([1, None, 3], type=pa.int32()),
+ "name": pa.array([None, "bob", None], type=pa.string()),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ _write_row_file(path, fields, data)
+ result = _read_row_file(path, fields)
+ assert result.column("id").to_pylist() == [1, None, 3]
+ assert result.column("name").to_pylist() == [None, "bob", None]
+ finally:
+ os.unlink(path)
+
+ def test_decimal(self):
+ fields = [
+ DataField(0, "d1", AtomicType("DECIMAL(10, 2)")),
+ DataField(1, "d2", AtomicType("DECIMAL(20, 5)")),
+ ]
+ data = pa.table({
+ "d1": pa.array([Decimal("123.45"), Decimal("-67.89")],
type=pa.decimal128(10, 2)),
+ "d2": pa.array([Decimal("12345.67890"), Decimal("-99999.12345")],
type=pa.decimal128(20, 5)),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ _write_row_file(path, fields, data)
+ result = _read_row_file(path, fields)
+ assert result.column("d1").to_pylist() == [Decimal("123.45"),
Decimal("-67.89")]
+ assert result.column("d2").to_pylist() == [Decimal("12345.67890"),
Decimal("-99999.12345")]
+ finally:
+ os.unlink(path)
+
+ def test_timestamp(self):
+ fields = [
+ DataField(0, "ts_millis", AtomicType("TIMESTAMP(3)")),
+ DataField(1, "ts_micros", AtomicType("TIMESTAMP(6)")),
+ ]
+ data = pa.table({
+ "ts_millis": pa.array([1000, 2000], type=pa.timestamp('ms')),
+ "ts_micros": pa.array([1000000, 2000000], type=pa.timestamp('us')),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ _write_row_file(path, fields, data)
+ result = _read_row_file(path, fields)
+ assert result.num_rows == 2
+ finally:
+ os.unlink(path)
+
+ def test_array_type(self):
+ element_type = AtomicType("INT")
+ fields = [
+ DataField(0, "arr", ArrayType(True, element_type)),
+ ]
+ data = pa.table({
+ "arr": pa.array([[1, 2, 3], [4, 5]], type=pa.list_(pa.int32())),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ _write_row_file(path, fields, data)
+ result = _read_row_file(path, fields)
+ assert result.column("arr").to_pylist() == [[1, 2, 3], [4, 5]]
+ finally:
+ os.unlink(path)
+
+ def test_map_type(self):
+ fields = [
+ DataField(0, "m", MapType(True, AtomicType("STRING"),
AtomicType("INT"))),
+ ]
+ data = pa.table({
+ "m": pa.array(
+ [[("a", 1), ("b", 2)], [("c", 3)]],
+ type=pa.map_(pa.string(), pa.int32())
+ ),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ _write_row_file(path, fields, data)
+ result = _read_row_file(path, fields)
+ result_maps = result.column("m").to_pylist()
+ assert len(result_maps) == 2
+ assert len(result_maps[0]) == 2
+ assert len(result_maps[1]) == 1
+ finally:
+ os.unlink(path)
+
+ def test_nested_row(self):
+ inner_type = RowType(True, [
+ DataField(0, "x", AtomicType("INT")),
+ DataField(1, "y", AtomicType("STRING")),
+ ])
+ fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "nested", inner_type),
+ ]
+ data = pa.table({
+ "id": pa.array([1, 2], type=pa.int32()),
+ "nested": pa.array(
+ [{"x": 10, "y": "a"}, {"x": 20, "y": "b"}],
+ type=pa.struct([
+ pa.field("x", pa.int32()),
+ pa.field("y", pa.string()),
+ ])
+ ),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ _write_row_file(path, fields, data)
+ result = _read_row_file(path, fields)
+ assert result.column("id").to_pylist() == [1, 2]
+ nested = result.column("nested").to_pylist()
+ assert nested[0] == {"x": 10, "y": "a"}
+ assert nested[1] == {"x": 20, "y": "b"}
+ finally:
+ os.unlink(path)
+
+ def test_multi_block(self):
+ fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "data", AtomicType("STRING")),
+ ]
+ num_rows = 5000
+ ids = list(range(num_rows))
+ strings = [f"value_{i}" for i in range(num_rows)]
+ data = pa.table({
+ "id": pa.array(ids, type=pa.int32()),
+ "data": pa.array(strings, type=pa.string()),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ with open(path, 'wb') as f:
+ writer = FormatRowWriter(f, fields, block_size=4096)
+ writer.write_table(data)
+ writer.close()
+
+ result = _read_row_file(path, fields)
+ assert result.num_rows == num_rows
+ assert result.column("id").to_pylist() == ids
+ assert result.column("data").to_pylist() == strings
+ finally:
+ os.unlink(path)
+
+ def test_empty_file(self):
+ fields = [
+ DataField(0, "id", AtomicType("INT")),
+ ]
+ data = pa.table({
+ "id": pa.array([], type=pa.int32()),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ _write_row_file(path, fields, data)
+ result = _read_row_file(path, fields)
+ assert result.num_rows == 0
+ finally:
+ os.unlink(path)
+
+ def test_column_projection(self):
+ fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "name", AtomicType("STRING")),
+ DataField(2, "value", AtomicType("DOUBLE")),
+ ]
+ data = pa.table({
+ "id": pa.array([1, 2, 3], type=pa.int32()),
+ "name": pa.array(["a", "b", "c"], type=pa.string()),
+ "value": pa.array([1.1, 2.2, 3.3], type=pa.float64()),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ _write_row_file(path, fields, data)
+ result = _read_row_file(path, fields, read_field_names=["id",
"value"])
+ assert result.num_columns == 2
+ assert result.column("id").to_pylist() == [1, 2, 3]
+ assert result.column("value").to_pylist() == [pytest.approx(1.1),
pytest.approx(2.2), pytest.approx(3.3)]
+ finally:
+ os.unlink(path)
+
+ def test_date_and_time(self):
+ fields = [
+ DataField(0, "d", AtomicType("DATE")),
+ DataField(1, "t", AtomicType("TIME")),
+ ]
+ data = pa.table({
+ "d": pa.array([18000, 19000], type=pa.date32()),
+ "t": pa.array([3600000, 7200000], type=pa.time32('ms')),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ _write_row_file(path, fields, data)
+ result = _read_row_file(path, fields)
+ assert result.num_rows == 2
+ finally:
+ os.unlink(path)
+
+ def test_variant_type(self):
+ fields = [
+ DataField(0, "v", AtomicType("VARIANT")),
+ ]
+ data = pa.table({
+ "v": pa.array(
+ [{"value": b"\x01\x02", "metadata": b"\x03\x04"},
+ {"value": b"\x05", "metadata": b"\x06\x07\x08"}],
+ type=pa.struct([
+ pa.field("value", pa.binary(), nullable=False),
+ pa.field("metadata", pa.binary(), nullable=False),
+ ])
+ ),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ _write_row_file(path, fields, data)
+ result = _read_row_file(path, fields)
+ variants = result.column("v").to_pylist()
+ assert variants[0]["value"] == b"\x01\x02"
+ assert variants[0]["metadata"] == b"\x03\x04"
+ assert variants[1]["value"] == b"\x05"
+ assert variants[1]["metadata"] == b"\x06\x07\x08"
+ finally:
+ os.unlink(path)
+
+ def test_row_indices_random_access(self):
+ """Test reading specific rows by index (O(1) row-number lookup)."""
+ fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "name", AtomicType("VARCHAR")),
+ ]
+ data = pa.table({
+ "id": pa.array(list(range(100)), type=pa.int32()),
+ "name": pa.array([f"row_{i}" for i in range(100)]),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ _write_row_file(path, fields, data)
+
+ # Read specific rows: 0, 5, 50, 99
+ result = _read_row_file(path, fields, row_indices=[0, 5, 50, 99])
+ assert result.num_rows == 4
+ assert result.column("id").to_pylist() == [0, 5, 50, 99]
+ assert result.column("name").to_pylist() == [
+ "row_0", "row_5", "row_50", "row_99"
+ ]
+
+ # Read single row
+ result = _read_row_file(path, fields, row_indices=[42])
+ assert result.num_rows == 1
+ assert result.column("id").to_pylist() == [42]
+
+ # Read empty indices
+ result = _read_row_file(path, fields, row_indices=[])
+ assert result.num_rows == 0
+ finally:
+ os.unlink(path)
+
+ def test_row_indices_multi_block(self):
+ """Test row_indices across multiple blocks."""
+ fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "value", AtomicType("VARCHAR")),
+ ]
+ # Write enough data to create multiple blocks (small block size)
+ n_rows = 500
+ data = pa.table({
+ "id": pa.array(list(range(n_rows)), type=pa.int32()),
+ "value": pa.array([f"val_{i}" * 10 for i in range(n_rows)]),
+ })
+
+ with tempfile.NamedTemporaryFile(suffix=".row", delete=False) as tmp:
+ path = tmp.name
+
+ try:
+ with open(path, 'wb') as f:
+ writer = FormatRowWriter(f, fields, block_size=1024)
+ writer.write_table(data)
+ writer.close()
+
+ # Read rows from different blocks
+ indices = [0, 100, 200, 300, 499]
+ result = _read_row_file(path, fields, row_indices=indices)
+ assert result.num_rows == 5
+ assert result.column("id").to_pylist() == indices
+ finally:
+ os.unlink(path)
+
+ def test_data_evolution_row_id_read(self):
+ """Test Data Evolution scenario: partial-column write then row-id
based read.
+
+ Simulates the Data Evolution pattern where:
+ 1. First commit writes columns (f0, f1)
+ 2. Second commit writes column (f2) with first_row_id=0
+ 3. Read merges by row ID to reconstruct full rows
+ """
+ import shutil
+ from pypaimon import CatalogFactory, Schema
+
+ tempdir = tempfile.mkdtemp()
+ try:
+ warehouse = os.path.join(tempdir, 'warehouse')
+ catalog = CatalogFactory.create({'warehouse': warehouse})
+ catalog.create_database('default', True)
+
+ pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'file.format': 'row',
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ catalog.create_table('default.de_row_id_test', schema, False)
+ table = catalog.get_table('default.de_row_id_test')
+
+ # First commit: write (f0, f1)
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write().with_write_type(['f0',
'f1'])
+ table_commit = write_builder.new_commit()
+
+ data1 = pa.table({
+ 'f0': pa.array([1, 2, 3, 4, 5], type=pa.int32()),
+ 'f1': pa.array(['a1', 'a2', 'a3', 'a4', 'a5']),
+ })
+ table_write.write_arrow(data1)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Second commit: write (f2) with first_row_id = 0
+ table_write = write_builder.new_write().with_write_type(['f2'])
+ table_commit = write_builder.new_commit()
+
+ data2 = pa.table({
+ 'f2': pa.array(['b1', 'b2', 'b3', 'b4', 'b5']),
+ })
+ table_write.write_arrow(data2)
+ cmts = table_write.prepare_commit()
+ cmts[0].new_files[0].first_row_id = 0
+ table_commit.commit(cmts)
+ table_write.close()
+ table_commit.close()
+
+ # Read full table - should merge partial columns by row ID
+ table = catalog.get_table('default.de_row_id_test')
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+
+ assert result.num_rows == 5
+ result_sorted = result.sort_by('f0')
+ assert result_sorted.column('f0').to_pylist() == [1, 2, 3, 4, 5]
+ assert result_sorted.column('f1').to_pylist() == [
+ 'a1', 'a2', 'a3', 'a4', 'a5'
+ ]
+ assert result_sorted.column('f2').to_pylist() == [
+ 'b1', 'b2', 'b3', 'b4', 'b5'
+ ]
+ finally:
+ shutil.rmtree(tempdir, ignore_errors=True)
diff --git a/paimon-python/pypaimon/tests/test_format_row_table.py
b/paimon-python/pypaimon/tests/test_format_row_table.py
new file mode 100644
index 0000000000..c483144547
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_format_row_table.py
@@ -0,0 +1,503 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Integration tests for the ROW file format across all table types."""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class RowFormatAppendOnlyTest(unittest.TestCase):
+ """Test ROW format with append-only (non-primary-key) tables."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', True)
+
+ cls.pa_schema = pa.schema([
+ ('user_id', pa.int32()),
+ ('item_id', pa.int64()),
+ ('behavior', pa.string()),
+ ('dt', pa.string()),
+ ])
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def _write_and_read(self, table, data):
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ return read_builder.new_read().to_arrow(splits)
+
+ def test_append_only_no_partition(self):
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema, options={'file.format': 'row'})
+ self.catalog.create_table('default.ao_row_no_part', schema, False)
+ table = self.catalog.get_table('default.ao_row_no_part')
+
+ data = pa.Table.from_pydict({
+ 'user_id': [1, 2, 3],
+ 'item_id': [1001, 1002, 1003],
+ 'behavior': ['buy', 'click', 'view'],
+ 'dt': ['2024-01-01', '2024-01-01', '2024-01-02'],
+ }, schema=self.pa_schema)
+
+ result = self._write_and_read(table, data)
+ self.assertEqual(result.num_rows, 3)
+ self.assertEqual(sorted(result.column('user_id').to_pylist()), [1, 2,
3])
+
+ def test_append_only_with_partition(self):
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema, partition_keys=['dt'],
+ options={'file.format': 'row'})
+ self.catalog.create_table('default.ao_row_partitioned', schema, False)
+ table = self.catalog.get_table('default.ao_row_partitioned')
+
+ data = pa.Table.from_pydict({
+ 'user_id': [1, 2, 3, 4],
+ 'item_id': [1001, 1002, 1003, 1004],
+ 'behavior': ['buy', 'click', 'view', 'buy'],
+ 'dt': ['p1', 'p1', 'p2', 'p2'],
+ }, schema=self.pa_schema)
+
+ result = self._write_and_read(table, data)
+ self.assertEqual(result.num_rows, 4)
+
+ def test_append_only_multiple_commits(self):
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema, options={'file.format': 'row'})
+ self.catalog.create_table('default.ao_row_multi_commit', schema, False)
+ table = self.catalog.get_table('default.ao_row_multi_commit')
+
+ data1 = pa.Table.from_pydict({
+ 'user_id': [1, 2],
+ 'item_id': [1001, 1002],
+ 'behavior': ['buy', 'click'],
+ 'dt': ['2024-01-01', '2024-01-01'],
+ }, schema=self.pa_schema)
+
+ data2 = pa.Table.from_pydict({
+ 'user_id': [3, 4],
+ 'item_id': [1003, 1004],
+ 'behavior': ['view', 'buy'],
+ 'dt': ['2024-01-02', '2024-01-02'],
+ }, schema=self.pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data1)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data2)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+ self.assertEqual(result.num_rows, 4)
+ self.assertEqual(sorted(result.column('user_id').to_pylist()), [1, 2,
3, 4])
+
+ def test_append_only_with_nulls(self):
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema, options={'file.format': 'row'})
+ self.catalog.create_table('default.ao_row_nulls', schema, False)
+ table = self.catalog.get_table('default.ao_row_nulls')
+
+ data = pa.Table.from_pydict({
+ 'user_id': [1, 2, 3],
+ 'item_id': [None, 1002, None],
+ 'behavior': ['buy', None, 'view'],
+ 'dt': ['2024-01-01', '2024-01-01', '2024-01-02'],
+ }, schema=self.pa_schema)
+
+ result = self._write_and_read(table, data)
+ self.assertEqual(result.num_rows, 3)
+ item_ids = sorted(result.column('item_id').to_pylist(), key=lambda x:
(x is None, x))
+ self.assertEqual(item_ids, [1002, None, None])
+
+ def test_append_only_column_projection(self):
+ """Test that reading with column projection decodes correctly."""
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema, options={'file.format': 'row'})
+ self.catalog.create_table('default.ao_row_projection', schema, False)
+ table = self.catalog.get_table('default.ao_row_projection')
+
+ data = pa.Table.from_pydict({
+ 'user_id': [1, 2, 3],
+ 'item_id': [1001, 1002, 1003],
+ 'behavior': ['buy', 'click', 'view'],
+ 'dt': ['2024-01-01', '2024-01-01', '2024-01-02'],
+ }, schema=self.pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Read only (user_id, behavior) - skipping item_id and dt
+ read_builder = table.new_read_builder()
+ read_builder = read_builder.with_projection(['user_id', 'behavior'])
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+ self.assertEqual(result.num_rows, 3)
+ self.assertEqual(result.schema.names, ['user_id', 'behavior'])
+ self.assertEqual(sorted(result.column('user_id').to_pylist()), [1, 2,
3])
+ self.assertEqual(
+ sorted(result.column('behavior').to_pylist()),
+ ['buy', 'click', 'view'])
+
+ # Read only (item_id, dt) - skipping user_id and behavior
+ read_builder = table.new_read_builder()
+ read_builder = read_builder.with_projection(['item_id', 'dt'])
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+ self.assertEqual(result.num_rows, 3)
+ self.assertEqual(result.schema.names, ['item_id', 'dt'])
+ self.assertEqual(
+ sorted(result.column('item_id').to_pylist()), [1001, 1002, 1003])
+
+
+class RowFormatPrimaryKeyTest(unittest.TestCase):
+ """Test ROW format with primary-key tables."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', True)
+
+ cls.pa_schema = pa.schema([
+ pa.field('user_id', pa.int32(), nullable=False),
+ ('item_id', pa.int64()),
+ ('behavior', pa.string()),
+ pa.field('dt', pa.string(), nullable=False),
+ ])
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def test_pk_table_basic(self):
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ partition_keys=['dt'],
+ primary_keys=['user_id', 'dt'],
+ options={'bucket': '1', 'file.format': 'row'})
+ self.catalog.create_table('default.pk_row_basic', schema, False)
+ table = self.catalog.get_table('default.pk_row_basic')
+
+ data = pa.Table.from_pydict({
+ 'user_id': [1, 2, 3],
+ 'item_id': [1001, 1002, 1003],
+ 'behavior': ['buy', 'click', 'view'],
+ 'dt': ['p1', 'p1', 'p2'],
+ }, schema=self.pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+ self.assertEqual(result.num_rows, 3)
+
+ def test_pk_table_upsert(self):
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ partition_keys=['dt'],
+ primary_keys=['user_id', 'dt'],
+ options={'bucket': '1', 'file.format': 'row'})
+ self.catalog.create_table('default.pk_row_upsert', schema, False)
+ table = self.catalog.get_table('default.pk_row_upsert')
+
+ write_builder = table.new_batch_write_builder()
+
+ # First commit
+ data1 = pa.Table.from_pydict({
+ 'user_id': [1, 2, 3],
+ 'item_id': [1001, 1002, 1003],
+ 'behavior': ['buy', 'click', 'view'],
+ 'dt': ['p1', 'p1', 'p1'],
+ }, schema=self.pa_schema)
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data1)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Second commit - update user_id=2
+ data2 = pa.Table.from_pydict({
+ 'user_id': [2, 4],
+ 'item_id': [1002, 1004],
+ 'behavior': ['buy-updated', 'new'],
+ 'dt': ['p1', 'p1'],
+ }, schema=self.pa_schema)
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data2)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+ self.assertEqual(result.num_rows, 4)
+ result_dict = result.sort_by('user_id').to_pydict()
+ self.assertEqual(result_dict['user_id'], [1, 2, 3, 4])
+ self.assertEqual(result_dict['behavior'], ['buy', 'buy-updated',
'view', 'new'])
+
+
+class RowFormatDataEvolutionTest(unittest.TestCase):
+ """Test ROW format with data-evolution enabled tables."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', True)
+
+ cls.pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('age', pa.int32()),
+ ('city', pa.string()),
+ ])
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def test_data_evolution_write_read(self):
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ options={
+ 'file.format': 'row',
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('default.de_row_basic', schema, False)
+ table = self.catalog.get_table('default.de_row_basic')
+
+ data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['alice', 'bob', 'charlie'],
+ 'age': [25, 30, 35],
+ 'city': ['beijing', 'shanghai', 'guangzhou'],
+ }, schema=self.pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+ self.assertEqual(result.num_rows, 3)
+ self.assertEqual(sorted(result.column('id').to_pylist()), [1, 2, 3])
+
+ def test_data_evolution_multiple_commits(self):
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ options={
+ 'file.format': 'row',
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('default.de_row_multi', schema, False)
+ table = self.catalog.get_table('default.de_row_multi')
+
+ write_builder = table.new_batch_write_builder()
+
+ data1 = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['alice', 'bob', 'charlie'],
+ 'age': [25, 30, 35],
+ 'city': ['beijing', 'shanghai', 'guangzhou'],
+ }, schema=self.pa_schema)
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data1)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ data2 = pa.Table.from_pydict({
+ 'id': [4, 5],
+ 'name': ['dave', 'eve'],
+ 'age': [40, 45],
+ 'city': ['shenzhen', 'hangzhou'],
+ }, schema=self.pa_schema)
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data2)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+ self.assertEqual(result.num_rows, 5)
+ self.assertEqual(sorted(result.column('id').to_pylist()), [1, 2, 3, 4,
5])
+
+
+class RowFormatBlobTableTest(unittest.TestCase):
+ """Test ROW format with blob tables (blob columns stored separately)."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', True)
+
+ cls.pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_data', pa.large_binary()),
+ ])
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def test_blob_table_with_row_format(self):
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ options={
+ 'file.format': 'row',
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('default.blob_row_basic', schema, False)
+ table = self.catalog.get_table('default.blob_row_basic')
+
+ data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['a', 'b', 'c'],
+ 'blob_data': [b'\x01\x02\x03', b'\x04\x05', b'\x06\x07\x08\x09'],
+ }, schema=self.pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+ self.assertEqual(result.num_rows, 3)
+ self.assertEqual(sorted(result.column('id').to_pylist()), [1, 2, 3])
+
+
+class RowFormatVectorTableTest(unittest.TestCase):
+ """Test ROW format with vector tables (vector columns inline)."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', True)
+
+ cls.pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('embed', pa.list_(pa.float32(), 3)),
+ ])
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def test_vector_inline_with_row_format(self):
+ """Vector stored inline in the same ROW file."""
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ options={'file.format': 'row'})
+ self.catalog.create_table('default.vec_row_inline', schema, False)
+ table = self.catalog.get_table('default.vec_row_inline')
+
+ data = pa.table({
+ 'id': pa.array([1, 2, 3], type=pa.int64()),
+ 'embed': pa.FixedSizeListArray.from_arrays(
+ pa.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
type=pa.float32()),
+ 3
+ ),
+ })
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+ self.assertEqual(result.num_rows, 3)
+ self.assertEqual(sorted(result.column('id').to_pylist()), [1, 2, 3])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index 4c3289f5aa..cb131f9a54 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -359,6 +359,8 @@ class DataBlobWriter(DataWriter):
self.file_io.write_lance(file_path, data)
elif self.file_format == CoreOptions.FILE_FORMAT_VORTEX:
self.file_io.write_vortex(file_path, data)
+ elif self.file_format == CoreOptions.FILE_FORMAT_ROW:
+ self.file_io.write_row(file_path, data, zstd_level=self.zstd_level)
else:
raise ValueError(f"Unsupported file format: {self.file_format}")
diff --git a/paimon-python/pypaimon/write/writer/data_vector_writer.py
b/paimon-python/pypaimon/write/writer/data_vector_writer.py
index d2b0adb568..9de5e2a27a 100644
--- a/paimon-python/pypaimon/write/writer/data_vector_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_vector_writer.py
@@ -212,6 +212,8 @@ class DataVectorWriter(DataWriter):
self.file_io.write_lance(file_path, data)
elif self.file_format == CoreOptions.FILE_FORMAT_VORTEX:
self.file_io.write_vortex(file_path, data)
+ elif self.file_format == CoreOptions.FILE_FORMAT_ROW:
+ self.file_io.write_row(file_path, data, zstd_level=self.zstd_level)
else:
raise ValueError(f"Unsupported file format: {self.file_format}")
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index f420a813c0..70c7594b08 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -198,6 +198,8 @@ class DataWriter(ABC):
self.file_io.write_lance(file_path, data)
elif self.file_format == CoreOptions.FILE_FORMAT_VORTEX:
self.file_io.write_vortex(file_path, data)
+ elif self.file_format == CoreOptions.FILE_FORMAT_ROW:
+ self.file_io.write_row(file_path, data, zstd_level=self.zstd_level)
else:
raise ValueError(f"Unsupported file format: {self.file_format}")
diff --git a/paimon-python/pypaimon/write/writer/format_row_writer.py
b/paimon-python/pypaimon/write/writer/format_row_writer.py
new file mode 100644
index 0000000000..f31c1eb58c
--- /dev/null
+++ b/paimon-python/pypaimon/write/writer/format_row_writer.py
@@ -0,0 +1,408 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import re
+import struct
+from decimal import Decimal
+from typing import Any, List
+
+import pyarrow as pa
+
+from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
+from pypaimon.schema.data_types import (
+ ArrayType, AtomicType, DataField, MapType, MultisetType, RowType,
VectorType
+)
+
+FOOTER_SIZE = 32
+MAGIC = 0x524F5753 # "ROWS"
+VERSION = 1
+DEFAULT_BLOCK_SIZE = 65536
+
+
+class FormatRowWriter:
+
+ def __init__(self, output_stream, fields: List[DataField],
+ block_size: int = DEFAULT_BLOCK_SIZE, zstd_level: int = 1):
+ self._out = output_stream
+ self._fields = fields
+ self._block_size = block_size
+ self._zstd_level = zstd_level
+
+ self._block_buf = _BlockBuffer()
+ self._row_offsets: List[int] = []
+
+ self._block_compressed_sizes: List[int] = []
+ self._block_uncompressed_sizes: List[int] = []
+ self._block_row_starts: List[int] = []
+ self._total_row_count = 0
+ self._bytes_written = 0
+
+ def write_table(self, data: pa.Table):
+ columns = {field.name: data.column(field.name).to_pylist()
+ for field in self._fields if field.name in
data.column_names}
+ num_rows = data.num_rows
+
+ for row_idx in range(num_rows):
+ row_values = [columns[field.name][row_idx] for field in
self._fields]
+ self._write_row(row_values)
+ self._total_row_count += 1
+
+ if self._block_buf.position + len(self._row_offsets) * 4 + 4 >=
self._block_size:
+ self._flush_block()
+
+ def close(self):
+ self._flush_block()
+
+ index_offset = self._bytes_written
+ self._write_block_index()
+ index_length = self._bytes_written - index_offset
+
+ self._write_footer(index_offset, index_length)
+ self._out.flush()
+
+ def _write_row(self, values: List[Any]):
+ self._row_offsets.append(self._block_buf.position)
+ arity = len(self._fields)
+ header_size = (arity + 7) // 8
+
+ header_start = self._block_buf.position
+ self._block_buf.write_zeros(header_size)
+
+ for i, field in enumerate(self._fields):
+ if values[i] is None:
+ byte_idx = header_start + i // 8
+ self._block_buf.buffer[byte_idx] |= (1 << (i % 8))
+ else:
+ _write_field(self._block_buf, values[i], field.type)
+
+ def _flush_block(self):
+ if not self._row_offsets:
+ return
+
+ import zstandard as zstd
+
+ self._block_row_starts.append(self._total_row_count -
len(self._row_offsets))
+
+ for offset in self._row_offsets:
+ self._block_buf.write_int_le(offset)
+ self._block_buf.write_int_le(len(self._row_offsets))
+
+ uncompressed = bytes(self._block_buf.buffer[:self._block_buf.position])
+ self._block_uncompressed_sizes.append(len(uncompressed))
+
+ compressor = zstd.ZstdCompressor(level=self._zstd_level)
+ compressed = compressor.compress(uncompressed)
+ self._block_compressed_sizes.append(len(compressed))
+
+ self._out.write(compressed)
+ self._bytes_written += len(compressed)
+
+ self._block_buf.reset()
+ self._row_offsets.clear()
+
+ def _write_block_index(self):
+ encoded_compressed =
DeltaVarintCompressor.compress(self._block_compressed_sizes)
+ encoded_uncompressed =
DeltaVarintCompressor.compress(self._block_uncompressed_sizes)
+ encoded_row_starts =
DeltaVarintCompressor.compress(self._block_row_starts)
+
+ self._write_varint_prefixed(encoded_compressed)
+ self._write_varint_prefixed(encoded_uncompressed)
+ self._write_varint_prefixed(encoded_row_starts)
+
+ def _write_varint_prefixed(self, data: bytes):
+ varint_bytes = _encode_var_int(len(data))
+ self._out.write(varint_bytes)
+ self._out.write(data)
+ self._bytes_written += len(varint_bytes) + len(data)
+
+ def _write_footer(self, index_offset: int, index_length: int):
+ buf = bytearray(FOOTER_SIZE)
+ struct.pack_into('<q', buf, 0, self._total_row_count)
+ struct.pack_into('<i', buf, 8, len(self._block_compressed_sizes))
+ struct.pack_into('<q', buf, 12, index_offset)
+ struct.pack_into('<i', buf, 20, index_length)
+ buf[24] = VERSION
+ # bytes 25-27 reserved (zeros)
+ struct.pack_into('<I', buf, 28, MAGIC)
+ self._out.write(bytes(buf))
+ self._bytes_written += FOOTER_SIZE
+
+
+class _BlockBuffer:
+
+ def __init__(self, initial_capacity: int = DEFAULT_BLOCK_SIZE):
+ self.buffer = bytearray(initial_capacity)
+ self.position = 0
+
+ def reset(self):
+ self.position = 0
+
+ def _ensure_capacity(self, additional: int):
+ required = self.position + additional
+ if required > len(self.buffer):
+ new_size = max(len(self.buffer) * 2, required)
+ new_buf = bytearray(new_size)
+ new_buf[:self.position] = self.buffer[:self.position]
+ self.buffer = new_buf
+
+ def write_zeros(self, count: int):
+ self._ensure_capacity(count)
+ for i in range(count):
+ self.buffer[self.position + i] = 0
+ self.position += count
+
+ def write_boolean(self, value: bool):
+ self._ensure_capacity(1)
+ self.buffer[self.position] = 1 if value else 0
+ self.position += 1
+
+ def write_byte(self, value: int):
+ self._ensure_capacity(1)
+ self.buffer[self.position] = value & 0xFF
+ self.position += 1
+
+ def write_short_le(self, value: int):
+ self._ensure_capacity(2)
+ struct.pack_into('<h', self.buffer, self.position, value)
+ self.position += 2
+
+ def write_int_le(self, value: int):
+ self._ensure_capacity(4)
+ struct.pack_into('<i', self.buffer, self.position, value)
+ self.position += 4
+
+ def write_long_le(self, value: int):
+ self._ensure_capacity(8)
+ struct.pack_into('<q', self.buffer, self.position, value)
+ self.position += 8
+
+ def write_float_le(self, value: float):
+ self._ensure_capacity(4)
+ struct.pack_into('<f', self.buffer, self.position, value)
+ self.position += 4
+
+ def write_double_le(self, value: float):
+ self._ensure_capacity(8)
+ struct.pack_into('<d', self.buffer, self.position, value)
+ self.position += 8
+
+ def write_var_int(self, value: int):
+ self._ensure_capacity(5)
+ while (value & ~0x7F) != 0:
+ self.buffer[self.position] = (value & 0x7F) | 0x80
+ self.position += 1
+ value >>= 7
+ self.buffer[self.position] = value & 0x7F
+ self.position += 1
+
+ def write_bytes_with_length(self, data: bytes):
+ self.write_var_int(len(data))
+ self._ensure_capacity(len(data))
+ self.buffer[self.position:self.position + len(data)] = data
+ self.position += len(data)
+
+ def write_string(self, value: str):
+ encoded = value.encode('utf-8')
+ self.write_bytes_with_length(encoded)
+
+
+def _encode_var_int(value: int) -> bytes:
+ result = bytearray()
+ while (value & ~0x7F) != 0:
+ result.append((value & 0x7F) | 0x80)
+ value >>= 7
+ result.append(value & 0x7F)
+ return bytes(result)
+
+
+def _write_field(buf: _BlockBuffer, value: Any, data_type) -> None:
+ if isinstance(data_type, AtomicType):
+ type_name = data_type.type.upper()
+ if type_name == 'BOOLEAN':
+ buf.write_boolean(value)
+ elif type_name == 'TINYINT':
+ buf.write_byte(value)
+ elif type_name == 'SMALLINT':
+ buf.write_short_le(value)
+ elif type_name in ('INT', 'INTEGER'):
+ buf.write_int_le(value)
+ elif type_name == 'DATE':
+ if isinstance(value, datetime.date):
+ epoch = datetime.date(1970, 1, 1)
+ days = (value - epoch).days
+ buf.write_int_le(days)
+ else:
+ buf.write_int_le(value)
+ elif type_name == 'TIME' or (type_name.startswith('TIME') and not
type_name.startswith('TIMESTAMP')):
+ if isinstance(value, datetime.time):
+ millis = (value.hour * 3600 + value.minute * 60 +
value.second) * 1000 + value.microsecond // 1000
+ buf.write_int_le(millis)
+ else:
+ buf.write_int_le(value)
+ elif type_name == 'BIGINT':
+ buf.write_long_le(value)
+ elif type_name == 'FLOAT':
+ buf.write_float_le(value)
+ elif type_name == 'DOUBLE':
+ buf.write_double_le(value)
+ elif type_name == 'STRING' or type_name.startswith('CHAR') or
type_name.startswith('VARCHAR'):
+ buf.write_string(value)
+ elif type_name == 'BYTES' or type_name.startswith('BINARY') or
type_name.startswith('VARBINARY'):
+ buf.write_bytes_with_length(value)
+ elif type_name == 'BLOB':
+ buf.write_bytes_with_length(value)
+ elif type_name.startswith('DECIMAL'):
+ precision, scale = _parse_decimal_params(type_name)
+ if precision <= 18:
+ if isinstance(value, Decimal):
+ unscaled = int(value * (10 ** scale))
+ else:
+ unscaled = int(Decimal(str(value)) * (10 ** scale))
+ buf.write_long_le(unscaled)
+ else:
+ if isinstance(value, Decimal):
+ unscaled = int(value * (10 ** scale))
+ else:
+ unscaled = int(Decimal(str(value)) * (10 ** scale))
+ raw = unscaled.to_bytes(
+ (unscaled.bit_length() + 8) // 8, byteorder='big',
signed=True)
+ buf.write_bytes_with_length(raw)
+ elif type_name.startswith('TIMESTAMP'):
+ precision = _parse_timestamp_precision(type_name)
+ if isinstance(value, datetime.datetime):
+ epoch = datetime.datetime(1970, 1, 1, tzinfo=value.tzinfo)
+ delta = value - epoch
+ total_micros = int(delta.total_seconds() * 1_000_000)
+ if precision <= 3:
+ buf.write_long_le(total_micros // 1000)
+ else:
+ millis = total_micros // 1000
+ nano_of_milli = (total_micros % 1000) * 1000
+ buf.write_long_le(millis)
+ buf.write_var_int(nano_of_milli)
+ elif precision <= 3:
+ buf.write_long_le(value)
+ else:
+ if isinstance(value, int):
+ millis = value // 1000
+ nano_of_milli = (value % 1000) * 1000
+ else:
+ millis = int(value) // 1000
+ nano_of_milli = (int(value) % 1000) * 1000
+ buf.write_long_le(millis)
+ buf.write_var_int(nano_of_milli)
+ elif type_name == 'VARIANT':
+ if isinstance(value, dict):
+ buf.write_bytes_with_length(value['value'])
+ buf.write_bytes_with_length(value['metadata'])
+ else:
+ buf.write_bytes_with_length(value.value if hasattr(value,
'value') else b'')
+ buf.write_bytes_with_length(value.metadata if hasattr(value,
'metadata') else b'')
+ else:
+ raise ValueError(f"Unsupported atomic type for writing:
{type_name}")
+
+ elif isinstance(data_type, ArrayType):
+ _write_array_elements(buf, value, data_type.element)
+
+ elif isinstance(data_type, VectorType):
+ _write_vector(buf, value, data_type.element)
+
+ elif isinstance(data_type, MapType):
+ if isinstance(value, dict):
+ keys = list(value.keys())
+ values = list(value.values())
+ else:
+ keys = [pair[0] for pair in value]
+ values = [pair[1] for pair in value]
+ _write_array_elements(buf, keys, data_type.key)
+ _write_array_elements(buf, values, data_type.value)
+
+ elif isinstance(data_type, MultisetType):
+ if isinstance(value, dict):
+ keys = list(value.keys())
+ counts = list(value.values())
+ else:
+ keys = [pair[0] for pair in value]
+ counts = [pair[1] for pair in value]
+ _write_array_elements(buf, keys, data_type.element)
+ _write_array_elements(buf, counts, AtomicType("INT"))
+
+ elif isinstance(data_type, RowType):
+ _write_nested_row(buf, value, data_type)
+
+ else:
+ raise ValueError(f"Unsupported data type for writing: {data_type}")
+
+
+def _write_array_elements(buf: _BlockBuffer, elements: list, element_type) ->
None:
+ size = len(elements)
+ buf.write_var_int(size)
+ null_bitmap_bytes = (size + 7) // 8
+
+ null_start = buf.position
+ buf.write_zeros(null_bitmap_bytes)
+
+ for i, elem in enumerate(elements):
+ if elem is None:
+ buf.buffer[null_start + i // 8] |= (1 << (i % 8))
+ else:
+ _write_field(buf, elem, element_type)
+
+
+def _write_vector(buf: _BlockBuffer, elements: list, element_type) -> None:
+ size = len(elements)
+ buf.write_var_int(size)
+ for elem in elements:
+ _write_field(buf, elem, element_type)
+
+
+def _write_nested_row(buf: _BlockBuffer, value, row_type: RowType) -> None:
+ fields = row_type.fields
+ arity = len(fields)
+ header_size = (arity + 7) // 8
+
+ header_start = buf.position
+ buf.write_zeros(header_size)
+
+ for i, field in enumerate(fields):
+ if isinstance(value, dict):
+ field_value = value.get(field.name)
+ else:
+ field_value = value[i] if i < len(value) else None
+
+ if field_value is None:
+ buf.buffer[header_start + i // 8] |= (1 << (i % 8))
+ else:
+ _write_field(buf, field_value, field.type)
+
+
+def _parse_decimal_params(type_name: str) -> tuple:
+ match = re.fullmatch(r'DECIMAL\((\d+),\s*(\d+)\)', type_name)
+ if match:
+ return int(match.group(1)), int(match.group(2))
+ match = re.fullmatch(r'DECIMAL\((\d+)\)', type_name)
+ if match:
+ return int(match.group(1)), 0
+ return 10, 0
+
+
+def _parse_timestamp_precision(type_name: str) -> int:
+ match = re.search(r'\((\d+)\)', type_name)
+ if match:
+ return int(match.group(1))
+ return 6