This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new aaa601939e [Python] Enable field merge read in row-tracking table 
(#6399)
aaa601939e is described below

commit aaa601939e9da177d53bda11838672190964085e
Author: umi <[email protected]>
AuthorDate: Wed Oct 15 10:54:23 2025 +0800

    [Python] Enable field merge read in row-tracking table (#6399)
---
 .../pypaimon/manifest/manifest_file_manager.py     |  16 +-
 .../pypaimon/manifest/schema/data_file_meta.py     |  61 ++-
 .../pypaimon/manifest/schema/manifest_entry.py     |  20 +
 .../read/reader/data_evolution_merge_reader.py     |  85 ++++
 ..._record_reader.py => data_file_batch_reader.py} |   0
 paimon-python/pypaimon/read/reader/field_bunch.py  | 120 +++++
 paimon-python/pypaimon/read/split_read.py          | 196 ++++++++-
 paimon-python/pypaimon/read/table_read.py          |  10 +-
 paimon-python/pypaimon/read/table_scan.py          | 106 +++++
 paimon-python/pypaimon/snapshot/snapshot.py        |   1 +
 .../pypaimon/tests/data_evolution_test.py          | 483 +++++++++++++++++++++
 .../pypaimon/tests/file_store_commit_test.py       |  24 +-
 .../pypaimon/tests/py36/data_evolution_test.py     | 483 +++++++++++++++++++++
 .../pypaimon/tests/py36/rest_ao_read_write_test.py |   6 +-
 paimon-python/pypaimon/tests/reader_base_test.py   |   6 +-
 .../pypaimon/tests/rest/rest_read_write_test.py    |   2 +-
 paimon-python/pypaimon/write/batch_table_write.py  |  19 +-
 paimon-python/pypaimon/write/file_store_commit.py  |  75 ++++
 paimon-python/pypaimon/write/file_store_write.py   |   2 +
 paimon-python/pypaimon/write/writer/data_writer.py |  22 +-
 20 files changed, 1708 insertions(+), 29 deletions(-)

diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index f4b0ab0be3..e3c9601cf4 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -60,9 +60,13 @@ class ManifestFileManager:
                 null_counts=key_dict['_NULL_COUNTS'],
             )
             value_dict = dict(file_dict['_VALUE_STATS'])
-            if file_dict.get('_VALUE_STATS_COLS') is None:
-                fields = self.table.table_schema.fields
-            elif not file_dict.get('_VALUE_STATS_COLS'):
+            if file_dict['_VALUE_STATS_COLS'] is None:
+                if file_dict['_WRITE_COLS'] is None:
+                    fields = self.table.table_schema.fields
+                else:
+                    read_fields = file_dict['_WRITE_COLS']
+                    fields = [self.table.field_dict[col] for col in 
read_fields]
+            elif not file_dict['_VALUE_STATS_COLS']:
                 fields = []
             else:
                 fields = [self.table.field_dict[col] for col in 
file_dict['_VALUE_STATS_COLS']]
@@ -89,6 +93,9 @@ class ManifestFileManager:
                 embedded_index=file_dict['_EMBEDDED_FILE_INDEX'],
                 file_source=file_dict['_FILE_SOURCE'],
                 value_stats_cols=file_dict.get('_VALUE_STATS_COLS'),
+                external_path=file_dict.get('_EXTERNAL_PATH'),
+                first_row_id=file_dict['_FIRST_ROW_ID'],
+                write_cols=file_dict['_WRITE_COLS'],
             )
             entry = ManifestEntry(
                 kind=record['_KIND'],
@@ -137,6 +144,9 @@ class ManifestFileManager:
                     "_EMBEDDED_FILE_INDEX": entry.file.embedded_index,
                     "_FILE_SOURCE": entry.file.file_source,
                     "_VALUE_STATS_COLS": entry.file.value_stats_cols,
+                    "_EXTERNAL_PATH": entry.file.external_path,
+                    "_FIRST_ROW_ID": entry.file.first_row_id,
+                    "_WRITE_COLS": entry.file.write_cols,
                 }
             }
             avro_records.append(avro_record)
diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py 
b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
index 82dfb66918..1d1bcb56fb 100644
--- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py
+++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
@@ -47,6 +47,8 @@ class DataFileMeta:
     file_source: Optional[str] = None
     value_stats_cols: Optional[List[str]] = None
     external_path: Optional[str] = None
+    first_row_id: Optional[int] = None
+    write_cols: Optional[List[str]] = None
 
     # not a schema field, just for internal usage
     file_path: str = None
@@ -59,6 +61,58 @@ class DataFileMeta:
         path_builder = path_builder / ("bucket-" + str(bucket)) / 
self.file_name
         self.file_path = str(path_builder)
 
+    def assign_first_row_id(self, first_row_id: int) -> 'DataFileMeta':
+        """Create a new DataFileMeta with the assigned first_row_id."""
+        return DataFileMeta(
+            file_name=self.file_name,
+            file_size=self.file_size,
+            row_count=self.row_count,
+            min_key=self.min_key,
+            max_key=self.max_key,
+            key_stats=self.key_stats,
+            value_stats=self.value_stats,
+            min_sequence_number=self.min_sequence_number,
+            max_sequence_number=self.max_sequence_number,
+            schema_id=self.schema_id,
+            level=self.level,
+            extra_files=self.extra_files,
+            creation_time=self.creation_time,
+            delete_row_count=self.delete_row_count,
+            embedded_index=self.embedded_index,
+            file_source=self.file_source,
+            value_stats_cols=self.value_stats_cols,
+            external_path=self.external_path,
+            first_row_id=first_row_id,
+            write_cols=self.write_cols,
+            file_path=self.file_path
+        )
+
+    def assign_sequence_number(self, min_sequence_number: int, 
max_sequence_number: int) -> 'DataFileMeta':
+        """Create a new DataFileMeta with the assigned sequence numbers."""
+        return DataFileMeta(
+            file_name=self.file_name,
+            file_size=self.file_size,
+            row_count=self.row_count,
+            min_key=self.min_key,
+            max_key=self.max_key,
+            key_stats=self.key_stats,
+            value_stats=self.value_stats,
+            min_sequence_number=min_sequence_number,
+            max_sequence_number=max_sequence_number,
+            schema_id=self.schema_id,
+            level=self.level,
+            extra_files=self.extra_files,
+            creation_time=self.creation_time,
+            delete_row_count=self.delete_row_count,
+            embedded_index=self.embedded_index,
+            file_source=self.file_source,
+            value_stats_cols=self.value_stats_cols,
+            external_path=self.external_path,
+            first_row_id=self.first_row_id,
+            write_cols=self.write_cols,
+            file_path=self.file_path
+        )
+
 
 DATA_FILE_META_SCHEMA = {
     "type": "record",
@@ -83,9 +137,14 @@ DATA_FILE_META_SCHEMA = {
          "default": None},
         {"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default": 
None},
         {"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default": 
None},
-        {"name": "_FILE_SOURCE", "type": ["null", "int"], "default": None},
+        {"name": "_FILE_SOURCE", "type": ["null", "string"], "default": None},
         {"name": "_VALUE_STATS_COLS",
          "type": ["null", {"type": "array", "items": "string"}],
          "default": None},
+        {"name": "_EXTERNAL_PATH", "type": ["null", "string"], "default": 
None},
+        {"name": "_FIRST_ROW_ID", "type": ["null", "long"], "default": None},
+        {"name": "_WRITE_COLS",
+         "type": ["null", {"type": "array", "items": "string"}],
+         "default": None},
     ]
 }
diff --git a/paimon-python/pypaimon/manifest/schema/manifest_entry.py 
b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
index 9a02341175..9608fbbd37 100644
--- a/paimon-python/pypaimon/manifest/schema/manifest_entry.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
@@ -31,6 +31,26 @@ class ManifestEntry:
     total_buckets: int
     file: DataFileMeta
 
+    def assign_first_row_id(self, first_row_id: int) -> 'ManifestEntry':
+        """Create a new ManifestEntry with the assigned first_row_id."""
+        return ManifestEntry(
+            kind=self.kind,
+            partition=self.partition,
+            bucket=self.bucket,
+            total_buckets=self.total_buckets,
+            file=self.file.assign_first_row_id(first_row_id)
+        )
+
+    def assign_sequence_number(self, min_sequence_number: int, 
max_sequence_number: int) -> 'ManifestEntry':
+        """Create a new ManifestEntry with the assigned sequence numbers."""
+        return ManifestEntry(
+            kind=self.kind,
+            partition=self.partition,
+            bucket=self.bucket,
+            total_buckets=self.total_buckets,
+            file=self.file.assign_sequence_number(min_sequence_number, 
max_sequence_number)
+        )
+
 
 MANIFEST_ENTRY_SCHEMA = {
     "type": "record",
diff --git a/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py 
b/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py
new file mode 100644
index 0000000000..43bf926862
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py
@@ -0,0 +1,85 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import List, Optional
+
+import pyarrow as pa
+from pyarrow import RecordBatch
+
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+
+
+class DataEvolutionMergeReader(RecordBatchReader):
+    """
+    This is a union reader which contains multiple inner readers, Each reader 
is responsible for reading one file.
+
+    This reader, assembling multiple reader into one big and great reader, 
will merge the batches from all readers.
+
+    For example, if rowOffsets is {0, 2, 0, 1, 2, 1} and fieldOffsets is {0, 
0, 1, 1, 1, 0}, it means:
+     - The first field comes from batch0, and it is at offset 0 in batch0.
+     - The second field comes from batch2, and it is at offset 0 in batch2.
+     - The third field comes from batch0, and it is at offset 1 in batch0.
+     - The fourth field comes from batch1, and it is at offset 1 in batch1.
+     - The fifth field comes from batch2, and it is at offset 1 in batch2.
+     - The sixth field comes from batch1, and it is at offset 0 in batch1.
+    """
+
+    def __init__(self, row_offsets: List[int], field_offsets: List[int], 
readers: List[Optional[RecordBatchReader]]):
+        if row_offsets is None:
+            raise ValueError("Row offsets must not be null")
+        if field_offsets is None:
+            raise ValueError("Field offsets must not be null")
+        if len(row_offsets) != len(field_offsets):
+            raise ValueError("Row offsets and field offsets must have the same 
length")
+        if not row_offsets:
+            raise ValueError("Row offsets must not be empty")
+        if not readers or len(readers) < 1:
+            raise ValueError("Readers should be more than 0")
+        self.row_offsets = row_offsets
+        self.field_offsets = field_offsets
+        self.readers = readers
+
+    def read_arrow_batch(self) -> Optional[RecordBatch]:
+        batches: List[Optional[RecordBatch]] = [None] * len(self.readers)
+        for i, reader in enumerate(self.readers):
+            if reader is not None:
+                batch = reader.read_arrow_batch()
+                if batch is None:
+                    # all readers are aligned, as long as one returns null, 
the others will also have no data
+                    return None
+                batches[i] = batch
+        # Assemble record batches from batches based on row_offsets and 
field_offsets
+        columns = []
+        names = []
+        for i in range(len(self.row_offsets)):
+            batch_index = self.row_offsets[i]
+            field_index = self.field_offsets[i]
+            if batches[batch_index] is not None:
+                column = batches[batch_index].column(field_index)
+                columns.append(column)
+                names.append(batches[batch_index].schema.names[field_index])
+        if columns:
+            return pa.RecordBatch.from_arrays(columns, names)
+        return None
+
+    def close(self) -> None:
+        try:
+            for reader in self.readers:
+                if reader is not None:
+                    reader.close()
+        except Exception as e:
+            raise IOError("Failed to close inner readers") from e
diff --git a/paimon-python/pypaimon/read/reader/data_file_record_reader.py 
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
similarity index 100%
rename from paimon-python/pypaimon/read/reader/data_file_record_reader.py
rename to paimon-python/pypaimon/read/reader/data_file_batch_reader.py
diff --git a/paimon-python/pypaimon/read/reader/field_bunch.py 
b/paimon-python/pypaimon/read/reader/field_bunch.py
new file mode 100644
index 0000000000..4ba82bd80e
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/field_bunch.py
@@ -0,0 +1,120 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+"""
+FieldBunch classes for organizing files by field in data evolution.
+
+These classes help organize DataFileMeta objects into groups based on their 
field content,
+supporting both regular data files and blob files.
+"""
+from abc import ABC
+from typing import List
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+
+
+class FieldBunch(ABC):
+    """Interface for files organized by field."""
+
+    def row_count(self) -> int:
+        """Return the total row count for this bunch."""
+        ...
+
+    def files(self) -> List[DataFileMeta]:
+        """Return the list of files in this bunch."""
+        ...
+
+
+class DataBunch(FieldBunch):
+    """Files for a single data file."""
+
+    def __init__(self, data_file: DataFileMeta):
+        self.data_file = data_file
+
+    def row_count(self) -> int:
+        return self.data_file.row_count
+
+    def files(self) -> List[DataFileMeta]:
+        return [self.data_file]
+
+
+class BlobBunch(FieldBunch):
+    """Files for partial field (blob files)."""
+
+    def __init__(self, expected_row_count: int):
+        self._files: List[DataFileMeta] = []
+        self.expected_row_count = expected_row_count
+        self.latest_first_row_id = -1
+        self.expected_next_first_row_id = -1
+        self.latest_max_sequence_number = -1
+        self._row_count = 0
+
+    def add(self, file: DataFileMeta) -> None:
+        """Add a blob file to this bunch."""
+        if not self._is_blob_file(file.file_name):
+            raise ValueError("Only blob file can be added to a blob bunch.")
+
+        if file.first_row_id == self.latest_first_row_id:
+            if file.max_sequence_number >= self.latest_max_sequence_number:
+                raise ValueError(
+                    "Blob file with same first row id should have decreasing 
sequence number."
+                )
+            return
+
+        if self._files:
+            first_row_id = file.first_row_id
+            if first_row_id < self.expected_next_first_row_id:
+                if file.max_sequence_number >= self.latest_max_sequence_number:
+                    raise ValueError(
+                        "Blob file with overlapping row id should have 
decreasing sequence number."
+                    )
+                return
+            elif first_row_id > self.expected_next_first_row_id:
+                raise ValueError(
+                    f"Blob file first row id should be continuous, expect "
+                    f"{self.expected_next_first_row_id} but got {first_row_id}"
+                )
+
+            if file.schema_id != self._files[0].schema_id:
+                raise ValueError(
+                    "All files in a blob bunch should have the same schema id."
+                )
+            if file.write_cols != self._files[0].write_cols:
+                raise ValueError(
+                    "All files in a blob bunch should have the same write 
columns."
+                )
+
+        self._files.append(file)
+        self._row_count += file.row_count
+        if self._row_count > self.expected_row_count:
+            raise ValueError(
+                f"Blob files row count exceed the expect 
{self.expected_row_count}"
+            )
+
+        self.latest_max_sequence_number = file.max_sequence_number
+        self.latest_first_row_id = file.first_row_id
+        self.expected_next_first_row_id = self.latest_first_row_id + 
file.row_count
+
+    def row_count(self) -> int:
+        return self._row_count
+
+    def files(self) -> List[DataFileMeta]:
+        return self._files
+
+    @staticmethod
+    def _is_blob_file(file_name: str) -> bool:
+        """Check if a file is a blob file based on its extension."""
+        return file_name.endswith('.blob')
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 7674db45f0..81ebdd86f8 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -23,11 +23,14 @@ from typing import List, Optional, Tuple
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.read.interval_partition import IntervalPartition, SortedRun
 from pypaimon.read.partition_info import PartitionInfo
 from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader, 
ShardBatchReader
 from pypaimon.read.reader.concat_record_reader import ConcatRecordReader
-from pypaimon.read.reader.data_file_record_reader import DataFileBatchReader
+from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
+from pypaimon.read.reader.data_evolution_merge_reader import 
DataEvolutionMergeReader
+from pypaimon.read.reader.field_bunch import FieldBunch, DataBunch, BlobBunch
 from pypaimon.read.reader.drop_delete_reader import DropDeleteRecordReader
 from pypaimon.read.reader.empty_record_reader import EmptyFileRecordReader
 from pypaimon.read.reader.filter_record_reader import FilterRecordReader
@@ -298,3 +301,194 @@ class MergeFileSplitRead(SplitRead):
 
     def _get_all_data_fields(self):
         return self._create_key_value_fields(self.table.fields)
+
+
+class DataEvolutionSplitRead(SplitRead):
+
+    def create_reader(self) -> RecordReader:
+        files = self.split.files
+        suppliers = []
+
+        # Split files by row ID using the same logic as Java 
DataEvolutionSplitGenerator.split
+        split_by_row_id = self._split_by_row_id(files)
+
+        for need_merge_files in split_by_row_id:
+            if len(need_merge_files) == 1 or not self.read_fields:
+                # No need to merge fields, just create a single file reader
+                suppliers.append(
+                    lambda f=need_merge_files[0]: self._create_file_reader(f)
+                )
+            else:
+                suppliers.append(
+                    lambda files=need_merge_files: 
self._create_union_reader(files)
+                )
+
+        return ConcatBatchReader(suppliers)
+
+    def _split_by_row_id(self, files: List[DataFileMeta]) -> 
List[List[DataFileMeta]]:
+        """Split files by firstRowId for data evolution."""
+
+        # Sort files by firstRowId and then by maxSequenceNumber
+        def sort_key(file: DataFileMeta) -> tuple:
+            first_row_id = file.first_row_id if file.first_row_id is not None 
else float('-inf')
+            is_blob = 1 if self._is_blob_file(file.file_name) else 0
+            max_seq = file.max_sequence_number
+            return (first_row_id, is_blob, -max_seq)
+
+        sorted_files = sorted(files, key=sort_key)
+
+        # Split files by firstRowId
+        split_by_row_id = []
+        last_row_id = -1
+        check_row_id_start = 0
+        current_split = []
+
+        for file in sorted_files:
+            first_row_id = file.first_row_id
+            if first_row_id is None:
+                split_by_row_id.append([file])
+                continue
+
+            if not self._is_blob_file(file.file_name) and first_row_id != 
last_row_id:
+                if current_split:
+                    split_by_row_id.append(current_split)
+                if first_row_id < check_row_id_start:
+                    raise ValueError(
+                        f"There are overlapping files in the split: {files}, "
+                        f"the wrong file is: {file}"
+                    )
+                current_split = []
+                last_row_id = first_row_id
+                check_row_id_start = first_row_id + file.row_count
+            current_split.append(file)
+
+        if current_split:
+            split_by_row_id.append(current_split)
+
+        return split_by_row_id
+
+    def _create_union_reader(self, need_merge_files: List[DataFileMeta]) -> 
RecordReader:
+        """Create a DataEvolutionFileReader for merging multiple files."""
+        # Split field bunches
+        fields_files = self._split_field_bunches(need_merge_files)
+
+        # Validate row counts and first row IDs
+        row_count = fields_files[0].row_count()
+        first_row_id = fields_files[0].files()[0].first_row_id
+
+        for bunch in fields_files:
+            if bunch.row_count() != row_count:
+                raise ValueError("All files in a field merge split should have 
the same row count.")
+            if bunch.files()[0].first_row_id != first_row_id:
+                raise ValueError(
+                    "All files in a field merge split should have the same 
first row id and could not be null."
+                )
+
+        # Create the union reader
+        all_read_fields = self.read_fields
+        file_record_readers = [None] * len(fields_files)
+        read_field_index = [field.id for field in all_read_fields]
+
+        # Initialize offsets
+        row_offsets = [-1] * len(all_read_fields)
+        field_offsets = [-1] * len(all_read_fields)
+
+        for i, bunch in enumerate(fields_files):
+            first_file = bunch.files()[0]
+
+            # Get field IDs for this bunch
+            if self._is_blob_file(first_file.file_name):
+                # For blob files, we need to get the field ID from the write 
columns
+                field_ids = [self._get_field_id_from_write_cols(first_file)]
+            elif first_file.write_cols:
+                field_ids = 
self._get_field_ids_from_write_cols(first_file.write_cols)
+            else:
+                # For regular files, get all field IDs from the schema
+                field_ids = [field.id for field in self.table.fields]
+
+            read_fields = []
+            for j, read_field_id in enumerate(read_field_index):
+                for field_id in field_ids:
+                    if read_field_id == field_id:
+                        if row_offsets[j] == -1:
+                            row_offsets[j] = i
+                            field_offsets[j] = len(read_fields)
+                            read_fields.append(all_read_fields[j])
+                        break
+
+            if not read_fields:
+                file_record_readers[i] = None
+            else:
+                table_fields = self.read_fields
+                self.read_fields = read_fields  # create reader based on 
read_fields
+                # Create reader for this bunch
+                if len(bunch.files()) == 1:
+                    file_record_readers[i] = 
self._create_file_reader(bunch.files()[0])
+                else:
+                    # Create concatenated reader for multiple files
+                    suppliers = [
+                        lambda f=file: self._create_file_reader(f) for file in 
bunch.files()
+                    ]
+                    file_record_readers[i] = ConcatRecordReader(suppliers)
+                self.read_fields = table_fields
+
+        # Validate that all required fields are found
+        for i, field in enumerate(all_read_fields):
+            if row_offsets[i] == -1:
+                if not field.type.is_nullable():
+                    raise ValueError(f"Field {field} is not null but can't 
find any file contains it.")
+
+        return DataEvolutionMergeReader(row_offsets, field_offsets, 
file_record_readers)
+
+    def _create_file_reader(self, file: DataFileMeta) -> RecordReader:
+        """Create a file reader for a single file."""
+        return self.file_reader_supplier(file_path=file.file_path, 
for_merge_read=False)
+
+    def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> 
List[FieldBunch]:
+        """Split files into field bunches."""
+
+        fields_files = []
+        blob_bunch_map = {}
+        row_count = -1
+
+        for file in need_merge_files:
+            if self._is_blob_file(file.file_name):
+                field_id = self._get_field_id_from_write_cols(file)
+                if field_id not in blob_bunch_map:
+                    blob_bunch_map[field_id] = BlobBunch(row_count)
+                blob_bunch_map[field_id].add(file)
+            else:
+                # Normal file, just add it to the current merge split
+                fields_files.append(DataBunch(file))
+                row_count = file.row_count
+
+        fields_files.extend(blob_bunch_map.values())
+        return fields_files
+
+    def _get_field_id_from_write_cols(self, file: DataFileMeta) -> int:
+        """Get field ID from write columns for blob files."""
+        if not file.write_cols or len(file.write_cols) == 0:
+            raise ValueError("Blob file must have write columns")
+
+        # Find the field by name in the table schema
+        field_name = file.write_cols[0]
+        for field in self.table.fields:
+            if field.name == field_name:
+                return field.id
+        raise ValueError(f"Field {field_name} not found in table schema")
+
+    def _get_field_ids_from_write_cols(self, write_cols: List[str]) -> 
List[int]:
+        field_ids = []
+        for field_name in write_cols:
+            for field in self.table.fields:
+                if field.name == field_name:
+                    field_ids.append(field.id)
+        return field_ids
+
+    @staticmethod
+    def _is_blob_file(file_name: str) -> bool:
+        """Check if a file is a blob file based on its extension."""
+        return file_name.endswith('.blob')
+
+    def _get_all_data_fields(self):
+        return self.table.fields
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index b5f7a7b765..b33fb2c6ad 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -26,7 +26,7 @@ from pypaimon.read.push_down_utils import 
extract_predicate_to_list
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
 from pypaimon.read.split import Split
 from pypaimon.read.split_read import (MergeFileSplitRead, RawFileSplitRead,
-                                      SplitRead)
+                                      SplitRead, DataEvolutionSplitRead)
 from pypaimon.schema.data_types import DataField, PyarrowFieldParser
 from pypaimon.table.row.offset_row import OffsetRow
 
@@ -132,6 +132,14 @@ class TableRead:
                 read_type=self.read_type,
                 split=split
             )
+        elif self.table.options.get('data-evolution.enabled', 'false').lower() 
== 'true':
+            return DataEvolutionSplitRead(
+                table=self.table,
+                predicate=self.predicate,
+                push_down_predicate=self.push_down_predicate,
+                read_type=self.read_type,
+                split=split
+            )
         else:
             return RawFileSplitRead(
                 table=self.table,
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index 6a6ab9f3f8..d76725ca97 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -68,6 +68,7 @@ class TableScan:
 
         self.only_read_real_buckets = True if int(
             self.table.options.get('bucket', -1)) == 
BucketMode.POSTPONE_BUCKET.value else False
+        self.data_evolution = self.table.options.get('data-evolution.enabled', 
'false').lower() == 'true'
 
     def plan(self) -> Plan:
         file_entries = self.plan_files()
@@ -75,6 +76,8 @@ class TableScan:
             return Plan([])
         if self.table.is_primary_key_table:
             splits = self._create_primary_key_splits(file_entries)
+        elif self.data_evolution:
+            splits = self._create_data_evolution_splits(file_entries)
         else:
             splits = self._create_append_only_splits(file_entries)
 
@@ -253,6 +256,48 @@ class TableScan:
             "row_count": file_entry.file.row_count,
         })
 
+    def _create_data_evolution_splits(self, file_entries: List[ManifestEntry]) 
-> List['Split']:
+        """
+        Create data evolution splits for append-only tables with schema 
evolution.
+        This method groups files by firstRowId and creates splits that can 
handle
+        column merging across different schema versions.
+        """
+        partitioned_files = defaultdict(list)
+        for entry in file_entries:
+            partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
+
+        if self.idx_of_this_subtask is not None:
+            partitioned_files, plan_start_row, plan_end_row = 
self._append_only_filter_by_shard(partitioned_files)
+
+        def weight_func(file_list: List[DataFileMeta]) -> int:
+            return max(sum(f.file_size for f in file_list), 
self.open_file_cost)
+
+        splits = []
+        for key, file_entries in partitioned_files.items():
+            if not file_entries:
+                continue
+
+            data_files: List[DataFileMeta] = [e.file for e in file_entries]
+
+            # Split files by firstRowId for data evolution
+            split_by_row_id = self._split_by_row_id(data_files)
+
+            # Pack the split groups for optimal split sizes
+            packed_files: List[List[List[DataFileMeta]]] = 
self._pack_for_ordered(split_by_row_id, weight_func,
+                                                                               
   self.target_split_size)
+
+            # Flatten the packed files and build splits
+            flatten_packed_files: List[List[DataFileMeta]] = [
+                [file for sub_pack in pack for file in sub_pack]
+                for pack in packed_files
+            ]
+
+            splits += self._build_split_from_pack(flatten_packed_files, 
file_entries, False)
+
+        if self.idx_of_this_subtask is not None:
+            self._compute_split_start_end_row(splits, plan_start_row, 
plan_end_row)
+        return splits
+
     def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> 
List['Split']:
         partitioned_files = defaultdict(list)
         for entry in file_entries:
@@ -360,3 +405,64 @@ class TableScan:
             packed.append(bin_items)
 
         return packed
+
+    @staticmethod
+    def _is_blob_file(file_name: str) -> bool:
+        """Check if a file is a blob file based on its extension."""
+        return file_name.endswith('.blob')
+
+    def _split_by_row_id(self, files: List[DataFileMeta]) -> 
List[List[DataFileMeta]]:
+        """
+        Split files by firstRowId for data evolution.
+        This method groups files that have the same firstRowId, which is 
essential
+        for handling schema evolution where files with different schemas need 
to be
+        read together to merge columns.
+        """
+        split_by_row_id = []
+
+        # Sort files by firstRowId and then by maxSequenceNumber
+        # Files with null firstRowId are treated as having Long.MIN_VALUE
+        def sort_key(file: DataFileMeta) -> tuple:
+            first_row_id = file.first_row_id if file.first_row_id is not None 
else float('-inf')
+            is_blob = 1 if self._is_blob_file(file.file_name) else 0
+            # For files with same firstRowId, sort by maxSequenceNumber in 
descending order
+            # (larger sequence number means more recent data)
+            max_seq = file.max_sequence_number
+            return (first_row_id, is_blob, -max_seq)
+
+        sorted_files = sorted(files, key=sort_key)
+
+        # Split files by firstRowId
+        last_row_id = -1
+        check_row_id_start = 0
+        current_split = []
+
+        for file in sorted_files:
+            first_row_id = file.first_row_id
+            if first_row_id is None:
+                # Files without firstRowId are treated as individual splits
+                split_by_row_id.append([file])
+                continue
+
+            if not self._is_blob_file(file.file_name) and first_row_id != 
last_row_id:
+                if current_split:
+                    split_by_row_id.append(current_split)
+
+                # Validate that files don't overlap
+                if first_row_id < check_row_id_start:
+                    file_names = [f.file_name for f in sorted_files]
+                    raise ValueError(
+                        f"There are overlapping files in the split: 
{file_names}, "
+                        f"the wrong file is: {file.file_name}"
+                    )
+
+                current_split = []
+                last_row_id = first_row_id
+                check_row_id_start = first_row_id + file.row_count
+
+            current_split.append(file)
+
+        if current_split:
+            split_by_row_id.append(current_split)
+
+        return split_by_row_id
diff --git a/paimon-python/pypaimon/snapshot/snapshot.py 
b/paimon-python/pypaimon/snapshot/snapshot.py
index 5bc92dcad4..96b287ab55 100644
--- a/paimon-python/pypaimon/snapshot/snapshot.py
+++ b/paimon-python/pypaimon/snapshot/snapshot.py
@@ -43,3 +43,4 @@ class Snapshot:
     changelog_record_count: Optional[int] = json_field("changelogRecordCount", 
default=None)
     watermark: Optional[int] = json_field("watermark", default=None)
     statistics: Optional[str] = json_field("statistics", default=None)
+    next_row_id: Optional[int] = json_field("nextRowId", default=None)
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py 
b/paimon-python/pypaimon/tests/data_evolution_test.py
new file mode 100644
index 0000000000..90abd2f916
--- /dev/null
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -0,0 +1,483 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import os
+import tempfile
+import unittest
+
+import pyarrow as pa
+from pypaimon import Schema, CatalogFactory
+
+
+class DataEvolutionTest(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({
+            'warehouse': cls.warehouse
+        })
+        cls.catalog.create_database('default', False)
+
+    def test_basic(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int8()),
+            ('f1', pa.int16()),
+        ])
+        schema = Schema.from_pyarrow_schema(simple_pa_schema,
+                                            options={'row-tracking.enabled': 
'true', 'data-evolution.enabled': 'true'})
+        self.catalog.create_table('default.test_row_tracking', schema, False)
+        table = self.catalog.get_table('default.test_row_tracking')
+
+        # write 1
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        expect_data = pa.Table.from_pydict({
+            'f0': [-1, 2],
+            'f1': [-1001, 1002]
+        }, schema=simple_pa_schema)
+        table_write.write_arrow(expect_data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # write 2
+        table_write = write_builder.new_write().with_write_type(['f0'])
+        table_commit = write_builder.new_commit()
+        data2 = pa.Table.from_pydict({
+            'f0': [3, 4],
+        }, schema=pa.schema([
+            ('f0', pa.int8()),
+        ]))
+        table_write.write_arrow(data2)
+        cmts = table_write.prepare_commit()
+        cmts[0].new_files[0].first_row_id = 0
+        table_commit.commit(cmts)
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_data = table_read.to_arrow(table_scan.plan().splits())
+        expect_data = pa.Table.from_pydict({
+            'f0': [3, 4],
+            'f1': [-1001, 1002]
+        }, schema=pa.schema([
+            ('f0', pa.int8()),
+            ('f1', pa.int16()),
+        ]))
+        self.assertEqual(actual_data, expect_data)
+
+    def test_multiple_appends(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            simple_pa_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'}
+        )
+        self.catalog.create_table('default.test_multiple_appends', schema, 
False)
+        table = self.catalog.get_table('default.test_multiple_appends')
+
+        write_builder = table.new_batch_write_builder()
+
+        # write 100 rows: (1, "a", "b")
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        init_data = pa.Table.from_pydict({
+            'f0': [1] * 100,
+            'f1': ['a'] * 100,
+            'f2': ['b'] * 100,
+        }, schema=simple_pa_schema)
+        table_write.write_arrow(init_data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+        # append:set first_row_id = 100 to modify the row with columns write
+        write0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+        write1 = write_builder.new_write().with_write_type(['f2'])
+        commit = write_builder.new_commit()
+        data0 = pa.Table.from_pydict({'f0': [2], 'f1': ['x']},
+                                     schema=pa.schema([('f0', pa.int32()), 
('f1', pa.string())]))
+        data1 = pa.Table.from_pydict({'f2': ['y']}, schema=pa.schema([('f2', 
pa.string())]))
+        write0.write_arrow(data0)
+        write1.write_arrow(data1)
+        cmts = write0.prepare_commit() + write1.prepare_commit()
+        for c in cmts:
+            for nf in c.new_files:
+                nf.first_row_id = 100
+        commit.commit(cmts)
+        write0.close()
+        write1.close()
+        commit.close()
+
+        # append:write (3, "c") and ("d"), set first_row_id = 101
+        write0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+        commit0 = write_builder.new_commit()
+        data0 = pa.Table.from_pydict({'f0': [3], 'f1': ['c']},
+                                     schema=pa.schema([('f0', pa.int32()), 
('f1', pa.string())]))
+        write0.write_arrow(data0)
+        cmts0 = write0.prepare_commit()
+        for c in cmts0:
+            for nf in c.new_files:
+                nf.first_row_id = 101
+        commit0.commit(cmts0)
+        write0.close()
+        commit0.close()
+
+        write1 = write_builder.new_write().with_write_type(['f2'])
+        commit1 = write_builder.new_commit()
+        data1 = pa.Table.from_pydict({'f2': ['d']}, schema=pa.schema([('f2', 
pa.string())]))
+        write1.write_arrow(data1)
+        cmts1 = write1.prepare_commit()
+        for c in cmts1:
+            for nf in c.new_files:
+                nf.first_row_id = 101
+        commit1.commit(cmts1)
+        write1.close()
+        commit1.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(table_scan.plan().splits())
+
+        self.assertEqual(actual.num_rows, 102)
+        expect = pa.Table.from_pydict({
+            'f0': [1] * 100 + [2] + [3],
+            'f1': ['a'] * 100 + ['x'] + ['c'],
+            'f2': ['b'] * 100 + ['y'] + ['d'],
+        }, schema=simple_pa_schema)
+        self.assertEqual(actual, expect)
+
+    def test_disorder_cols_append(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            simple_pa_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'}
+        )
+        self.catalog.create_table('default.test_disorder_cols_append', schema, 
False)
+        table = self.catalog.get_table('default.test_disorder_cols_append')
+
+        write_builder = table.new_batch_write_builder()
+        num_rows = 100
+        # write 1 rows: (1, "a", "b")
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        init_data = pa.Table.from_pydict({
+            'f0': [1] * num_rows,
+            'f1': ['a'] * num_rows,
+            'f2': ['b'] * num_rows,
+        }, schema=simple_pa_schema)
+        table_write.write_arrow(init_data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # append:set first_row_id = 0 to modify the row with columns write
+        write0 = write_builder.new_write().with_write_type(['f0', 'f2'])
+        write1 = write_builder.new_write().with_write_type(['f1'])
+        commit = write_builder.new_commit()
+        data0 = pa.Table.from_pydict({'f0': [2] * num_rows, 'f2': ['y'] * 
num_rows},
+                                     schema=pa.schema([('f0', pa.int32()), 
('f2', pa.string())]))
+        data1 = pa.Table.from_pydict({'f1': ['x'] * num_rows}, 
schema=pa.schema([('f1', pa.string())]))
+        write0.write_arrow(data0)
+        write1.write_arrow(data1)
+        cmts = write0.prepare_commit() + write1.prepare_commit()
+        for c in cmts:
+            for nf in c.new_files:
+                nf.first_row_id = 0
+        commit.commit(cmts)
+        write0.close()
+        write1.close()
+        commit.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(table_scan.plan().splits())
+
+        self.assertEqual(actual.num_rows, 100)
+        expect = pa.Table.from_pydict({
+            'f0': [2] * num_rows,
+            'f1': ['x'] * num_rows,
+            'f2': ['y'] * num_rows,
+        }, schema=simple_pa_schema)
+        self.assertEqual(actual, expect)
+
+    def test_only_some_columns(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            simple_pa_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'}
+        )
+        self.catalog.create_table('default.test_only_some_columns', schema, 
False)
+        table = self.catalog.get_table('default.test_only_some_columns')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Commit 1: f0
+        w0 = write_builder.new_write().with_write_type(['f0'])
+        c0 = write_builder.new_commit()
+        d0 = pa.Table.from_pydict({'f0': [1]}, schema=pa.schema([('f0', 
pa.int32())]))
+        w0.write_arrow(d0)
+        c0.commit(w0.prepare_commit())
+        w0.close()
+        c0.close()
+
+        # Commit 2: f1, first_row_id = 0
+        w1 = write_builder.new_write().with_write_type(['f1'])
+        c1 = write_builder.new_commit()
+        d1 = pa.Table.from_pydict({'f1': ['a']}, schema=pa.schema([('f1', 
pa.string())]))
+        w1.write_arrow(d1)
+        cmts1 = w1.prepare_commit()
+        for c in cmts1:
+            for nf in c.new_files:
+                nf.first_row_id = 0
+        c1.commit(cmts1)
+        w1.close()
+        c1.close()
+
+        # Commit 3: f2, first_row_id = 0
+        w2 = write_builder.new_write().with_write_type(['f2'])
+        c2 = write_builder.new_commit()
+        d2 = pa.Table.from_pydict({'f2': ['b']}, schema=pa.schema([('f2', 
pa.string())]))
+        w2.write_arrow(d2)
+        cmts2 = w2.prepare_commit()
+        for c in cmts2:
+            for nf in c.new_files:
+                nf.first_row_id = 0
+        c2.commit(cmts2)
+        w2.close()
+        c2.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(table_scan.plan().splits())
+
+        expect = pa.Table.from_pydict({
+            'f0': [1],
+            'f1': ['a'],
+            'f2': ['b'],
+        }, schema=simple_pa_schema)
+        self.assertEqual(actual, expect)
+
+    def test_null_values(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            simple_pa_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'}
+        )
+        self.catalog.create_table('default.test_null_values', schema, False)
+        table = self.catalog.get_table('default.test_null_values')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Commit 1: some cols are null
+        w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+        w1 = write_builder.new_write().with_write_type(['f2'])
+        c = write_builder.new_commit()
+
+        d0 = pa.Table.from_pydict({'f0': [1], 'f1': [None]},
+                                  schema=pa.schema([('f0', pa.int32()), ('f1', 
pa.string())]))
+        d1 = pa.Table.from_pydict({'f2': [None]}, schema=pa.schema([('f2', 
pa.string())]))
+        w0.write_arrow(d0)
+        w1.write_arrow(d1)
+        cmts = w0.prepare_commit() + w1.prepare_commit()
+        for msg in cmts:
+            for nf in msg.new_files:
+                nf.first_row_id = 0
+        c.commit(cmts)
+        w0.close()
+        w1.close()
+        c.close()
+
+        # Commit 2
+        w1 = write_builder.new_write().with_write_type(['f2'])
+        c1 = write_builder.new_commit()
+        d1 = pa.Table.from_pydict({'f2': ['c']}, schema=pa.schema([('f2', 
pa.string())]))
+        w1.write_arrow(d1)
+        cmts1 = w1.prepare_commit()
+        for msg in cmts1:
+            for nf in msg.new_files:
+                nf.first_row_id = 0
+        c1.commit(cmts1)
+        w1.close()
+        c1.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(table_scan.plan().splits())
+        expect = pa.Table.from_pydict({
+            'f0': [1],
+            'f1': [None],
+            'f2': ['c'],
+        }, schema=simple_pa_schema)
+        self.assertEqual(actual, expect)
+
+    # different first_row_id append multiple times
+    def test_multiple_appends_different_first_row_ids(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            simple_pa_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'}
+        )
+        self.catalog.create_table('default.test_multiple_appends_diff_rowid', 
schema, False)
+        table = 
self.catalog.get_table('default.test_multiple_appends_diff_rowid')
+
+        write_builder = table.new_batch_write_builder()
+
+        # commit 1
+        w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+        w1 = write_builder.new_write().with_write_type(['f2'])
+        c = write_builder.new_commit()
+        d0 = pa.Table.from_pydict({'f0': [1], 'f1': ['a']},
+                                  schema=pa.schema([('f0', pa.int32()), ('f1', 
pa.string())]))
+        d1 = pa.Table.from_pydict({'f2': ['b']}, schema=pa.schema([('f2', 
pa.string())]))
+        w0.write_arrow(d0)
+        w1.write_arrow(d1)
+        cmts = w0.prepare_commit() + w1.prepare_commit()
+        for msg in cmts:
+            for nf in msg.new_files:
+                nf.first_row_id = 0
+        c.commit(cmts)
+        w0.close()
+        w1.close()
+        c.close()
+
+        # commit 2
+        w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+        c0 = write_builder.new_commit()
+        d0 = pa.Table.from_pydict({'f0': [2], 'f1': ['c']},
+                                  schema=pa.schema([('f0', pa.int32()), ('f1', 
pa.string())]))
+        w0.write_arrow(d0)
+        cmts0 = w0.prepare_commit()
+        for msg in cmts0:
+            for nf in msg.new_files:
+                nf.first_row_id = 1
+        c0.commit(cmts0)
+        w0.close()
+        c0.close()
+
+        # commit 3
+        w1 = write_builder.new_write().with_write_type(['f2'])
+        c1 = write_builder.new_commit()
+        d1 = pa.Table.from_pydict({'f2': ['d']}, schema=pa.schema([('f2', 
pa.string())]))
+        w1.write_arrow(d1)
+        cmts1 = w1.prepare_commit()
+        for msg in cmts1:
+            for nf in msg.new_files:
+                nf.first_row_id = 1
+        c1.commit(cmts1)
+        w1.close()
+        c1.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(table_scan.plan().splits())
+
+        expect = pa.Table.from_pydict({
+            'f0': [1, 2],
+            'f1': ['a', 'c'],
+            'f2': ['b', 'd'],
+        }, schema=simple_pa_schema)
+        self.assertEqual(actual, expect)
+
+    def test_more_data(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            simple_pa_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'}
+        )
+        self.catalog.create_table('default.test_more_data', schema, False)
+        table = self.catalog.get_table('default.test_more_data')
+
+        write_builder = table.new_batch_write_builder()
+
+        # first commit:100k rows
+        w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+        w1 = write_builder.new_write().with_write_type(['f2'])
+        c = write_builder.new_commit()
+        size = 100000
+        d0 = pa.Table.from_pydict({
+            'f0': list(range(size)),
+            'f1': [f'a{i}' for i in range(size)],
+        }, schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())]))
+        d1 = pa.Table.from_pydict({
+            'f2': [f'b{i}' for i in range(size)],
+        }, schema=pa.schema([('f2', pa.string())]))
+        w0.write_arrow(d0)
+        w1.write_arrow(d1)
+        cmts = w0.prepare_commit() + w1.prepare_commit()
+        for msg in cmts:
+            for nf in msg.new_files:
+                nf.first_row_id = 0
+        c.commit(cmts)
+        w0.close()
+        w1.close()
+        c.close()
+
+        # second commit:overwrite f2 to 'c{i}'
+        w1 = write_builder.new_write().with_write_type(['f2'])
+        c1 = write_builder.new_commit()
+        d1 = pa.Table.from_pydict({
+            'f2': [f'c{i}' for i in range(size)],
+        }, schema=pa.schema([('f2', pa.string())]))
+        w1.write_arrow(d1)
+        cmts1 = w1.prepare_commit()
+        c1.commit(cmts1)
+        w1.close()
+        c1.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(table_scan.plan().splits())
+
+        expect = pa.Table.from_pydict({
+            'f0': list(range(size)),
+            'f1': [f'a{i}' for i in range(size)],
+            'f2': [f'c{i}' for i in range(size)],
+        }, schema=simple_pa_schema)
+        self.assertEqual(actual, expect)
diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py 
b/paimon-python/pypaimon/tests/file_store_commit_test.py
index ac7ce95094..ab566c3e52 100644
--- a/paimon-python/pypaimon/tests/file_store_commit_test.py
+++ b/paimon-python/pypaimon/tests/file_store_commit_test.py
@@ -76,7 +76,10 @@ class TestFileStoreCommit(unittest.TestCase):
             schema_id=0,
             level=0,
             extra_files=None,
-            creation_time=creation_time
+            creation_time=creation_time,
+            external_path=None,
+            first_row_id=None,
+            write_cols=None
         )
 
         commit_message = CommitMessage(
@@ -182,7 +185,10 @@ class TestFileStoreCommit(unittest.TestCase):
             schema_id=0,
             level=0,
             extra_files=None,
-            creation_time=creation_time
+            creation_time=creation_time,
+            external_path=None,
+            first_row_id=None,
+            write_cols=None
         )
 
         # File for partition 2
@@ -199,7 +205,10 @@ class TestFileStoreCommit(unittest.TestCase):
             schema_id=0,
             level=0,
             extra_files=None,
-            creation_time=creation_time
+            creation_time=creation_time,
+            external_path=None,
+            first_row_id=None,
+            write_cols=None
         )
 
         commit_message_1 = CommitMessage(
@@ -261,7 +270,10 @@ class TestFileStoreCommit(unittest.TestCase):
             schema_id=0,
             level=0,
             extra_files=None,
-            creation_time=creation_time
+            creation_time=creation_time,
+            external_path=None,
+            first_row_id=None,
+            write_cols=None
         )
 
         commit_message = CommitMessage(
@@ -389,7 +401,3 @@ class TestFileStoreCommit(unittest.TestCase):
                     file=file
                 ))
         return commit_entries
-
-
-if __name__ == '__main__':
-    unittest.main()
diff --git a/paimon-python/pypaimon/tests/py36/data_evolution_test.py 
b/paimon-python/pypaimon/tests/py36/data_evolution_test.py
new file mode 100644
index 0000000000..90abd2f916
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py36/data_evolution_test.py
@@ -0,0 +1,483 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import os
+import tempfile
+import unittest
+
+import pyarrow as pa
+from pypaimon import Schema, CatalogFactory
+
+
+class DataEvolutionTest(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({
+            'warehouse': cls.warehouse
+        })
+        cls.catalog.create_database('default', False)
+
+    def test_basic(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int8()),
+            ('f1', pa.int16()),
+        ])
+        schema = Schema.from_pyarrow_schema(simple_pa_schema,
+                                            options={'row-tracking.enabled': 
'true', 'data-evolution.enabled': 'true'})
+        self.catalog.create_table('default.test_row_tracking', schema, False)
+        table = self.catalog.get_table('default.test_row_tracking')
+
+        # write 1
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        expect_data = pa.Table.from_pydict({
+            'f0': [-1, 2],
+            'f1': [-1001, 1002]
+        }, schema=simple_pa_schema)
+        table_write.write_arrow(expect_data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # write 2
+        table_write = write_builder.new_write().with_write_type(['f0'])
+        table_commit = write_builder.new_commit()
+        data2 = pa.Table.from_pydict({
+            'f0': [3, 4],
+        }, schema=pa.schema([
+            ('f0', pa.int8()),
+        ]))
+        table_write.write_arrow(data2)
+        cmts = table_write.prepare_commit()
+        cmts[0].new_files[0].first_row_id = 0
+        table_commit.commit(cmts)
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_data = table_read.to_arrow(table_scan.plan().splits())
+        expect_data = pa.Table.from_pydict({
+            'f0': [3, 4],
+            'f1': [-1001, 1002]
+        }, schema=pa.schema([
+            ('f0', pa.int8()),
+            ('f1', pa.int16()),
+        ]))
+        self.assertEqual(actual_data, expect_data)
+
+    def test_multiple_appends(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            simple_pa_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'}
+        )
+        self.catalog.create_table('default.test_multiple_appends', schema, 
False)
+        table = self.catalog.get_table('default.test_multiple_appends')
+
+        write_builder = table.new_batch_write_builder()
+
+        # write 100 rows: (1, "a", "b")
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        init_data = pa.Table.from_pydict({
+            'f0': [1] * 100,
+            'f1': ['a'] * 100,
+            'f2': ['b'] * 100,
+        }, schema=simple_pa_schema)
+        table_write.write_arrow(init_data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+        # append:set first_row_id = 100 to modify the row with columns write
+        write0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+        write1 = write_builder.new_write().with_write_type(['f2'])
+        commit = write_builder.new_commit()
+        data0 = pa.Table.from_pydict({'f0': [2], 'f1': ['x']},
+                                     schema=pa.schema([('f0', pa.int32()), 
('f1', pa.string())]))
+        data1 = pa.Table.from_pydict({'f2': ['y']}, schema=pa.schema([('f2', 
pa.string())]))
+        write0.write_arrow(data0)
+        write1.write_arrow(data1)
+        cmts = write0.prepare_commit() + write1.prepare_commit()
+        for c in cmts:
+            for nf in c.new_files:
+                nf.first_row_id = 100
+        commit.commit(cmts)
+        write0.close()
+        write1.close()
+        commit.close()
+
+        # append:write (3, "c") and ("d"), set first_row_id = 101
+        write0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+        commit0 = write_builder.new_commit()
+        data0 = pa.Table.from_pydict({'f0': [3], 'f1': ['c']},
+                                     schema=pa.schema([('f0', pa.int32()), 
('f1', pa.string())]))
+        write0.write_arrow(data0)
+        cmts0 = write0.prepare_commit()
+        for c in cmts0:
+            for nf in c.new_files:
+                nf.first_row_id = 101
+        commit0.commit(cmts0)
+        write0.close()
+        commit0.close()
+
+        write1 = write_builder.new_write().with_write_type(['f2'])
+        commit1 = write_builder.new_commit()
+        data1 = pa.Table.from_pydict({'f2': ['d']}, schema=pa.schema([('f2', 
pa.string())]))
+        write1.write_arrow(data1)
+        cmts1 = write1.prepare_commit()
+        for c in cmts1:
+            for nf in c.new_files:
+                nf.first_row_id = 101
+        commit1.commit(cmts1)
+        write1.close()
+        commit1.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(table_scan.plan().splits())
+
+        self.assertEqual(actual.num_rows, 102)
+        expect = pa.Table.from_pydict({
+            'f0': [1] * 100 + [2] + [3],
+            'f1': ['a'] * 100 + ['x'] + ['c'],
+            'f2': ['b'] * 100 + ['y'] + ['d'],
+        }, schema=simple_pa_schema)
+        self.assertEqual(actual, expect)
+
+    def test_disorder_cols_append(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            simple_pa_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'}
+        )
+        self.catalog.create_table('default.test_disorder_cols_append', schema, 
False)
+        table = self.catalog.get_table('default.test_disorder_cols_append')
+
+        write_builder = table.new_batch_write_builder()
+        num_rows = 100
+        # write 1 rows: (1, "a", "b")
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        init_data = pa.Table.from_pydict({
+            'f0': [1] * num_rows,
+            'f1': ['a'] * num_rows,
+            'f2': ['b'] * num_rows,
+        }, schema=simple_pa_schema)
+        table_write.write_arrow(init_data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # append:set first_row_id = 0 to modify the row with columns write
+        write0 = write_builder.new_write().with_write_type(['f0', 'f2'])
+        write1 = write_builder.new_write().with_write_type(['f1'])
+        commit = write_builder.new_commit()
+        data0 = pa.Table.from_pydict({'f0': [2] * num_rows, 'f2': ['y'] * 
num_rows},
+                                     schema=pa.schema([('f0', pa.int32()), 
('f2', pa.string())]))
+        data1 = pa.Table.from_pydict({'f1': ['x'] * num_rows}, 
schema=pa.schema([('f1', pa.string())]))
+        write0.write_arrow(data0)
+        write1.write_arrow(data1)
+        cmts = write0.prepare_commit() + write1.prepare_commit()
+        for c in cmts:
+            for nf in c.new_files:
+                nf.first_row_id = 0
+        commit.commit(cmts)
+        write0.close()
+        write1.close()
+        commit.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(table_scan.plan().splits())
+
+        self.assertEqual(actual.num_rows, 100)
+        expect = pa.Table.from_pydict({
+            'f0': [2] * num_rows,
+            'f1': ['x'] * num_rows,
+            'f2': ['y'] * num_rows,
+        }, schema=simple_pa_schema)
+        self.assertEqual(actual, expect)
+
+    def test_only_some_columns(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            simple_pa_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'}
+        )
+        self.catalog.create_table('default.test_only_some_columns', schema, 
False)
+        table = self.catalog.get_table('default.test_only_some_columns')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Commit 1: f0
+        w0 = write_builder.new_write().with_write_type(['f0'])
+        c0 = write_builder.new_commit()
+        d0 = pa.Table.from_pydict({'f0': [1]}, schema=pa.schema([('f0', 
pa.int32())]))
+        w0.write_arrow(d0)
+        c0.commit(w0.prepare_commit())
+        w0.close()
+        c0.close()
+
+        # Commit 2: f1, first_row_id = 0
+        w1 = write_builder.new_write().with_write_type(['f1'])
+        c1 = write_builder.new_commit()
+        d1 = pa.Table.from_pydict({'f1': ['a']}, schema=pa.schema([('f1', 
pa.string())]))
+        w1.write_arrow(d1)
+        cmts1 = w1.prepare_commit()
+        for c in cmts1:
+            for nf in c.new_files:
+                nf.first_row_id = 0
+        c1.commit(cmts1)
+        w1.close()
+        c1.close()
+
+        # Commit 3: f2, first_row_id = 0
+        w2 = write_builder.new_write().with_write_type(['f2'])
+        c2 = write_builder.new_commit()
+        d2 = pa.Table.from_pydict({'f2': ['b']}, schema=pa.schema([('f2', 
pa.string())]))
+        w2.write_arrow(d2)
+        cmts2 = w2.prepare_commit()
+        for c in cmts2:
+            for nf in c.new_files:
+                nf.first_row_id = 0
+        c2.commit(cmts2)
+        w2.close()
+        c2.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(table_scan.plan().splits())
+
+        expect = pa.Table.from_pydict({
+            'f0': [1],
+            'f1': ['a'],
+            'f2': ['b'],
+        }, schema=simple_pa_schema)
+        self.assertEqual(actual, expect)
+
+    def test_null_values(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            simple_pa_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'}
+        )
+        self.catalog.create_table('default.test_null_values', schema, False)
+        table = self.catalog.get_table('default.test_null_values')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Commit 1: some cols are null
+        w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+        w1 = write_builder.new_write().with_write_type(['f2'])
+        c = write_builder.new_commit()
+
+        d0 = pa.Table.from_pydict({'f0': [1], 'f1': [None]},
+                                  schema=pa.schema([('f0', pa.int32()), ('f1', 
pa.string())]))
+        d1 = pa.Table.from_pydict({'f2': [None]}, schema=pa.schema([('f2', 
pa.string())]))
+        w0.write_arrow(d0)
+        w1.write_arrow(d1)
+        cmts = w0.prepare_commit() + w1.prepare_commit()
+        for msg in cmts:
+            for nf in msg.new_files:
+                nf.first_row_id = 0
+        c.commit(cmts)
+        w0.close()
+        w1.close()
+        c.close()
+
+        # Commit 2
+        w1 = write_builder.new_write().with_write_type(['f2'])
+        c1 = write_builder.new_commit()
+        d1 = pa.Table.from_pydict({'f2': ['c']}, schema=pa.schema([('f2', 
pa.string())]))
+        w1.write_arrow(d1)
+        cmts1 = w1.prepare_commit()
+        for msg in cmts1:
+            for nf in msg.new_files:
+                nf.first_row_id = 0
+        c1.commit(cmts1)
+        w1.close()
+        c1.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(table_scan.plan().splits())
+        expect = pa.Table.from_pydict({
+            'f0': [1],
+            'f1': [None],
+            'f2': ['c'],
+        }, schema=simple_pa_schema)
+        self.assertEqual(actual, expect)
+
+    # different first_row_id append multiple times
+    def test_multiple_appends_different_first_row_ids(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            simple_pa_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'}
+        )
+        self.catalog.create_table('default.test_multiple_appends_diff_rowid', 
schema, False)
+        table = 
self.catalog.get_table('default.test_multiple_appends_diff_rowid')
+
+        write_builder = table.new_batch_write_builder()
+
+        # commit 1
+        w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+        w1 = write_builder.new_write().with_write_type(['f2'])
+        c = write_builder.new_commit()
+        d0 = pa.Table.from_pydict({'f0': [1], 'f1': ['a']},
+                                  schema=pa.schema([('f0', pa.int32()), ('f1', 
pa.string())]))
+        d1 = pa.Table.from_pydict({'f2': ['b']}, schema=pa.schema([('f2', 
pa.string())]))
+        w0.write_arrow(d0)
+        w1.write_arrow(d1)
+        cmts = w0.prepare_commit() + w1.prepare_commit()
+        for msg in cmts:
+            for nf in msg.new_files:
+                nf.first_row_id = 0
+        c.commit(cmts)
+        w0.close()
+        w1.close()
+        c.close()
+
+        # commit 2
+        w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+        c0 = write_builder.new_commit()
+        d0 = pa.Table.from_pydict({'f0': [2], 'f1': ['c']},
+                                  schema=pa.schema([('f0', pa.int32()), ('f1', 
pa.string())]))
+        w0.write_arrow(d0)
+        cmts0 = w0.prepare_commit()
+        for msg in cmts0:
+            for nf in msg.new_files:
+                nf.first_row_id = 1
+        c0.commit(cmts0)
+        w0.close()
+        c0.close()
+
+        # commit 3
+        w1 = write_builder.new_write().with_write_type(['f2'])
+        c1 = write_builder.new_commit()
+        d1 = pa.Table.from_pydict({'f2': ['d']}, schema=pa.schema([('f2', 
pa.string())]))
+        w1.write_arrow(d1)
+        cmts1 = w1.prepare_commit()
+        for msg in cmts1:
+            for nf in msg.new_files:
+                nf.first_row_id = 1
+        c1.commit(cmts1)
+        w1.close()
+        c1.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(table_scan.plan().splits())
+
+        expect = pa.Table.from_pydict({
+            'f0': [1, 2],
+            'f1': ['a', 'c'],
+            'f2': ['b', 'd'],
+        }, schema=simple_pa_schema)
+        self.assertEqual(actual, expect)
+
+    def test_more_data(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            simple_pa_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'}
+        )
+        self.catalog.create_table('default.test_more_data', schema, False)
+        table = self.catalog.get_table('default.test_more_data')
+
+        write_builder = table.new_batch_write_builder()
+
+        # first commit:100k rows
+        w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+        w1 = write_builder.new_write().with_write_type(['f2'])
+        c = write_builder.new_commit()
+        size = 100000
+        d0 = pa.Table.from_pydict({
+            'f0': list(range(size)),
+            'f1': [f'a{i}' for i in range(size)],
+        }, schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())]))
+        d1 = pa.Table.from_pydict({
+            'f2': [f'b{i}' for i in range(size)],
+        }, schema=pa.schema([('f2', pa.string())]))
+        w0.write_arrow(d0)
+        w1.write_arrow(d1)
+        cmts = w0.prepare_commit() + w1.prepare_commit()
+        for msg in cmts:
+            for nf in msg.new_files:
+                nf.first_row_id = 0
+        c.commit(cmts)
+        w0.close()
+        w1.close()
+        c.close()
+
+        # second commit:overwrite f2 to 'c{i}'
+        w1 = write_builder.new_write().with_write_type(['f2'])
+        c1 = write_builder.new_commit()
+        d1 = pa.Table.from_pydict({
+            'f2': [f'c{i}' for i in range(size)],
+        }, schema=pa.schema([('f2', pa.string())]))
+        w1.write_arrow(d1)
+        cmts1 = w1.prepare_commit()
+        c1.commit(cmts1)
+        w1.close()
+        c1.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(table_scan.plan().splits())
+
+        expect = pa.Table.from_pydict({
+            'f0': list(range(size)),
+            'f1': [f'a{i}' for i in range(size)],
+            'f2': [f'c{i}' for i in range(size)],
+        }, schema=simple_pa_schema)
+        self.assertEqual(actual, expect)
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index 20e6a2c2d8..e6374132af 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -550,7 +550,7 @@ class RESTReadWritePy36Test(RESTBaseTest):
 
         with self.assertRaises(ValueError) as e:
             table_write.write_arrow_batch(record_batch)
-        self.assertTrue(str(e.exception).startswith("Input schema isn't 
consistent with table schema."))
+        self.assertTrue(str(e.exception).startswith("Input schema isn't 
consistent with table schema and write cols."))
 
     def test_write_wide_table_large_data(self):
         logging.basicConfig(level=logging.INFO)
@@ -801,7 +801,9 @@ class RESTReadWritePy36Test(RESTBaseTest):
             embedded_index=None,
             file_source=None,
             value_stats_cols=value_stats_cols,  # This is the key field we're 
testing
-            external_path=None
+            external_path=None,
+            first_row_id=None,
+            write_cols=None
         )
 
         # Create ManifestEntry
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py 
b/paimon-python/pypaimon/tests/reader_base_test.py
index ccf06d5597..6e9dc1ffc6 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -248,7 +248,7 @@ class ReaderBasicTest(unittest.TestCase):
 
         with self.assertRaises(ValueError) as e:
             table_write.write_arrow_batch(record_batch)
-        self.assertTrue(str(e.exception).startswith("Input schema isn't 
consistent with table schema."))
+        self.assertTrue(str(e.exception).startswith("Input schema isn't 
consistent with table schema and write cols."))
 
     def test_reader_iterator(self):
         read_builder = self.table.new_read_builder()
@@ -609,7 +609,9 @@ class ReaderBasicTest(unittest.TestCase):
             embedded_index=None,
             file_source=None,
             value_stats_cols=value_stats_cols,  # This is the key field we're 
testing
-            external_path=None
+            external_path=None,
+            first_row_id=None,
+            write_cols=None
         )
 
         # Create ManifestEntry
diff --git a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py 
b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
index dc6c47e778..d05942a256 100644
--- a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
@@ -339,7 +339,7 @@ class RESTTableReadWriteTest(RESTBaseTest):
 
         with self.assertRaises(ValueError) as e:
             table_write.write_arrow_batch(record_batch)
-        self.assertTrue(str(e.exception).startswith("Input schema isn't 
consistent with table schema."))
+        self.assertTrue(str(e.exception).startswith("Input schema isn't 
consistent with table schema and write cols."))
 
     def test_reader_iterator(self):
         read_builder = self.table.new_read_builder()
diff --git a/paimon-python/pypaimon/write/batch_table_write.py 
b/paimon-python/pypaimon/write/batch_table_write.py
index c2e533cef5..a71e9c0503 100644
--- a/paimon-python/pypaimon/write/batch_table_write.py
+++ b/paimon-python/pypaimon/write/batch_table_write.py
@@ -67,10 +67,21 @@ class BatchTableWrite:
         self.batch_committed = True
         return self.file_store_write.prepare_commit()
 
+    def with_write_type(self, write_cols: List[str]):
+        for col in write_cols:
+            if col not in self.table_pyarrow_schema.names:
+                raise ValueError(f"Column {col} is not in table schema.")
+        if len(write_cols) == len(self.table_pyarrow_schema.names):
+            write_cols = None
+        self.file_store_write.write_cols = write_cols
+        return self
+
     def close(self):
         self.file_store_write.close()
 
-    def _validate_pyarrow_schema(self, data_schema):
-        if data_schema != self.table_pyarrow_schema:
-            raise ValueError(f"Input schema isn't consistent with table 
schema. "
-                             f"Table schema is: {data_schema} Input schema is: 
{self.table_pyarrow_schema}")
+    def _validate_pyarrow_schema(self, data_schema: pa.Schema):
+        if data_schema != self.table_pyarrow_schema and data_schema.names != 
self.file_store_write.write_cols:
+            raise ValueError(f"Input schema isn't consistent with table schema 
and write cols. "
+                             f"Input schema is: {data_schema} "
+                             f"Table schema is: {self.table_pyarrow_schema} "
+                             f"Write cols is: 
{self.file_store_write.write_cols}")
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 5920f50ad8..10d2796b76 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -130,6 +130,24 @@ class FileStoreCommit:
         added_file_count = 0
         deleted_file_count = 0
         delta_record_count = 0
+        # process snapshot
+        new_snapshot_id = self._generate_snapshot_id()
+
+        # Check if row tracking is enabled
+        row_tracking_enabled = self.table.options.get('row-tracking.enabled', 
'false').lower() == 'true'
+
+        # Apply row tracking logic if enabled
+        next_row_id = None
+        if row_tracking_enabled:
+            # Assign snapshot ID to delta files
+            commit_entries = self._assign_snapshot_id(new_snapshot_id, 
commit_entries)
+
+            # Get the next row ID start from the latest snapshot
+            first_row_id_start = self._get_next_row_id_start()
+
+            # Assign row IDs to new files and get the next row ID for the 
snapshot
+            commit_entries, next_row_id = 
self._assign_row_tracking_meta(first_row_id_start, commit_entries)
+
         for entry in commit_entries:
             if entry.kind == 0:
                 added_file_count += 1
@@ -194,6 +212,7 @@ class FileStoreCommit:
             commit_identifier=commit_identifier,
             commit_kind=commit_kind,
             time_millis=int(time.time() * 1000),
+            next_row_id=next_row_id,
         )
 
         # Generate partition statistics for the commit
@@ -314,3 +333,59 @@ class FileStoreCommit:
             )
             for stats in partition_stats.values()
         ]
+
+    def _assign_snapshot_id(self, snapshot_id: int, commit_entries: 
List[ManifestEntry]) -> List[ManifestEntry]:
+        """Assign snapshot ID to all commit entries."""
+        return [entry.assign_sequence_number(snapshot_id, snapshot_id) for 
entry in commit_entries]
+
+    def _get_next_row_id_start(self) -> int:
+        """Get the next row ID start from the latest snapshot."""
+        latest_snapshot = self.snapshot_manager.get_latest_snapshot()
+        if latest_snapshot and hasattr(latest_snapshot, 'next_row_id') and 
latest_snapshot.next_row_id is not None:
+            return latest_snapshot.next_row_id
+        return 0
+
+    def _assign_row_tracking_meta(self, first_row_id_start: int, 
commit_entries: List[ManifestEntry]):
+        """
+        Assign row tracking metadata (first_row_id) to new files.
+        This follows the Java implementation logic from 
FileStoreCommitImpl.assignRowTrackingMeta.
+        """
+        if not commit_entries:
+            return commit_entries, first_row_id_start
+
+        row_id_assigned = []
+        start = first_row_id_start
+        blob_start = first_row_id_start
+
+        for entry in commit_entries:
+            # Check if this is an append file that needs row ID assignment
+            if (entry.kind == 0 and  # ADD kind
+                    entry.file.file_source == "APPEND" and  # APPEND file 
source
+                    entry.file.first_row_id is None):  # No existing 
first_row_id
+
+                if self._is_blob_file(entry.file.file_name):
+                    # Handle blob files specially
+                    if blob_start >= start:
+                        raise RuntimeError(
+                            f"This is a bug, blobStart {blob_start} should be 
less than start {start} "
+                            f"when assigning a blob entry file."
+                        )
+                    row_count = entry.file.row_count
+                    
row_id_assigned.append(entry.assign_first_row_id(blob_start))
+                    blob_start += row_count
+                else:
+                    # Handle regular files
+                    row_count = entry.file.row_count
+                    row_id_assigned.append(entry.assign_first_row_id(start))
+                    blob_start = start
+                    start += row_count
+            else:
+                # For compact files or files that already have first_row_id, 
don't assign
+                row_id_assigned.append(entry)
+
+        return row_id_assigned, start
+
+    @staticmethod
+    def _is_blob_file(file_name: str) -> bool:
+        """Check if a file is a blob file based on its extension."""
+        return file_name.endswith('.blob')
diff --git a/paimon-python/pypaimon/write/file_store_write.py 
b/paimon-python/pypaimon/write/file_store_write.py
index bcef10a4c3..841fef3a65 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -34,6 +34,7 @@ class FileStoreWrite:
         self.table: FileStoreTable = table
         self.data_writers: Dict[Tuple, DataWriter] = {}
         self.max_seq_numbers = self._seq_number_stats()  # TODO: build this 
on-demand instead of on all
+        self.write_cols = None
 
     def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
         key = (partition, bucket)
@@ -56,6 +57,7 @@ class FileStoreWrite:
                 partition=partition,
                 bucket=bucket,
                 max_seq_number=self.max_seq_numbers.get((partition, bucket), 
1),
+                write_cols=self.write_cols
             )
 
     def prepare_commit(self) -> List[CommitMessage]:
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index cc0fc944a1..ad6e327c89 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -26,6 +26,7 @@ from typing import Dict, List, Optional, Tuple
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.schema.data_types import PyarrowFieldParser
 from pypaimon.table.bucket_mode import BucketMode
 from pypaimon.table.row.generic_row import GenericRow
 
@@ -33,7 +34,8 @@ from pypaimon.table.row.generic_row import GenericRow
 class DataWriter(ABC):
     """Base class for data writers that handle PyArrow tables directly."""
 
-    def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int):
+    def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int,
+                 write_cols: Optional[List[str]] = None):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
@@ -55,6 +57,7 @@ class DataWriter(ABC):
 
         self.pending_data: Optional[pa.Table] = None
         self.committed_files: List[DataFileMeta] = []
+        self.write_cols = write_cols
 
     def write(self, data: pa.RecordBatch):
         processed_data = self._process_data(data)
@@ -126,11 +129,13 @@ class DataWriter(ABC):
         max_key = [col.to_pylist()[0] for col in max_key_row_batch.columns]
 
         # key stats & value stats
+        data_fields = self.table.fields if self.table.is_primary_key_table \
+            else PyarrowFieldParser.to_paimon_schema(data.schema)
         column_stats = {
             field.name: self._get_column_stats(data, field.name)
-            for field in self.table.table_schema.fields
+            for field in data_fields
         }
-        all_fields = self.table.table_schema.fields
+        all_fields = data_fields
         min_value_stats = [column_stats[field.name]['min_values'] for field in 
all_fields]
         max_value_stats = [column_stats[field.name]['max_values'] for field in 
all_fields]
         value_null_counts = [column_stats[field.name]['null_counts'] for field 
in all_fields]
@@ -156,8 +161,8 @@ class DataWriter(ABC):
                 key_null_counts,
             ),
             value_stats=SimpleStats(
-                GenericRow(min_value_stats, self.table.table_schema.fields),
-                GenericRow(max_value_stats, self.table.table_schema.fields),
+                GenericRow(min_value_stats, data_fields),
+                GenericRow(max_value_stats, data_fields),
                 value_null_counts,
             ),
             min_sequence_number=min_seq,
@@ -167,7 +172,12 @@ class DataWriter(ABC):
             extra_files=[],
             creation_time=datetime.now(),
             delete_row_count=0,
-            value_stats_cols=None,  # None means all columns have statistics
+            file_source="APPEND",
+            value_stats_cols=None,  # None means all columns in the data have 
statistics
+            external_path=None,
+            first_row_id=None,
+            write_cols=self.write_cols,
+            # None means all columns in the table have been written
             file_path=str(file_path),
         ))
 


Reply via email to