This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new be4d1a3169 [python] Support conflict detection when updating existing 
data concurrently. (#7323)
be4d1a3169 is described below

commit be4d1a3169f054b0f1b61da6147b1e219b898180
Author: umi <[email protected]>
AuthorDate: Tue Mar 3 21:06:39 2026 +0800

    [python] Support conflict detection when updating existing data 
concurrently. (#7323)
---
 .../pypaimon/manifest/manifest_file_manager.py     |  15 +-
 .../pypaimon/manifest/schema/file_entry.py         | 127 +++++++++++++
 .../pypaimon/manifest/schema/manifest_entry.py     |   3 +-
 paimon-python/pypaimon/schema/data_types.py        |  69 ++++++-
 paimon-python/pypaimon/table/row/generic_row.py    |  10 +
 paimon-python/pypaimon/tests/table_update_test.py  | 193 ++++++++++++++++++++
 paimon-python/pypaimon/utils/range_helper.py       | 133 ++++++++++++++
 paimon-python/pypaimon/write/commit/__init__.py    |  16 ++
 .../pypaimon/write/commit/commit_rollback.py       |  62 +++++++
 .../pypaimon/write/commit/commit_scanner.py        | 127 +++++++++++++
 .../pypaimon/write/commit/conflict_detection.py    | 203 +++++++++++++++++++++
 paimon-python/pypaimon/write/commit_message.py     |   3 +-
 paimon-python/pypaimon/write/file_store_commit.py  |  79 ++++++--
 .../pypaimon/write/table_update_by_row_id.py       |   8 +-
 14 files changed, 1018 insertions(+), 30 deletions(-)

diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 0ed5091825..5975fcbc9f 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -53,17 +53,6 @@ class ManifestFileManager:
         def _process_single_manifest(manifest_file: ManifestFileMeta) -> 
List[ManifestEntry]:
             return self.read(manifest_file.file_name, manifest_entry_filter, 
drop_stats)
 
-        def _entry_identifier(e: ManifestEntry) -> tuple:
-            return (
-                tuple(e.partition.values),
-                e.bucket,
-                e.file.level,
-                e.file.file_name,
-                tuple(e.file.extra_files) if e.file.extra_files else (),
-                e.file.embedded_index,
-                e.file.external_path,
-            )
-
         deleted_entry_keys = set()
         added_entries = []
         with ThreadPoolExecutor(max_workers=max_workers) as executor:
@@ -73,11 +62,11 @@ class ManifestFileManager:
                     if entry.kind == 0:  # ADD
                         added_entries.append(entry)
                     else:  # DELETE
-                        deleted_entry_keys.add(_entry_identifier(entry))
+                        deleted_entry_keys.add(entry.identifier())
 
         final_entries = [
             entry for entry in added_entries
-            if _entry_identifier(entry) not in deleted_entry_keys
+            if entry.identifier() not in deleted_entry_keys
         ]
         return final_entries
 
diff --git a/paimon-python/pypaimon/manifest/schema/file_entry.py 
b/paimon-python/pypaimon/manifest/schema/file_entry.py
new file mode 100644
index 0000000000..5429f25cb6
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/schema/file_entry.py
@@ -0,0 +1,127 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+"""
+Entry representing a file.
+"""
+
+
+class FileEntry:
+    """Entry representing a file.
+
+    The same Identifier indicates that the FileEntry refers to the same data 
file.
+    """
+
+    class Identifier:
+        """Unique identifier for a file entry.
+
+        Uses partition, bucket, level, fileName, extraFiles,
+        embeddedIndex and externalPath to identify a file.
+        """
+
+        def __init__(self, partition, bucket, level, file_name,
+                     extra_files, embedded_index, external_path):
+            self.partition = partition
+            self.bucket = bucket
+            self.level = level
+            self.file_name = file_name
+            self.extra_files = extra_files
+            self.embedded_index = embedded_index
+            self.external_path = external_path
+            self._hash = None
+
+        def __eq__(self, other):
+            if self is other:
+                return True
+            if other is None or not isinstance(other, FileEntry.Identifier):
+                return False
+            return (self.bucket == other.bucket
+                    and self.level == other.level
+                    and self.partition == other.partition
+                    and self.file_name == other.file_name
+                    and self.extra_files == other.extra_files
+                    and self.embedded_index == other.embedded_index
+                    and self.external_path == other.external_path)
+
+        def __hash__(self):
+            if self._hash is None:
+                self._hash = hash((
+                    self.partition,
+                    self.bucket,
+                    self.level,
+                    self.file_name,
+                    self.extra_files,
+                    self.embedded_index,
+                    self.external_path,
+                ))
+            return self._hash
+
+    def identifier(self):
+        """Build a unique Identifier for this file entry.
+
+        Returns:
+            An Identifier instance.
+        """
+        extra_files = (tuple(self.file.extra_files)
+                       if self.file.extra_files else ())
+        return FileEntry.Identifier(
+            partition=self.partition,
+            bucket=self.bucket,
+            level=self.file.level,
+            file_name=self.file.file_name,
+            extra_files=extra_files,
+            embedded_index=self.file.embedded_index,
+            external_path=self.file.external_path,
+        )
+
+    @staticmethod
+    def merge_entries(entries):
+        """Merge file entries: ADD and DELETE of the same file cancel each 
other.
+
+        - ADD: if identifier already in map, raise error; otherwise add to map.
+        - DELETE: if identifier already in map, remove both (cancel);
+          otherwise add to map.
+
+        Args:
+            entries: Iterable of FileEntry.
+
+        Returns:
+            List of merged FileEntry values, preserving insertion order.
+
+        Raises:
+            RuntimeError: If trying to add a file that is already in the map.
+        """
+        entry_map = {}
+
+        for entry in entries:
+            entry_identifier = entry.identifier()
+            if entry.kind == 0:  # ADD
+                if entry_identifier in entry_map:
+                    raise RuntimeError(
+                        "Trying to add file {} which is already added.".format(
+                            entry.file.file_name))
+                entry_map[entry_identifier] = entry
+            elif entry.kind == 1:  # DELETE
+                if entry_identifier in entry_map:
+                    del entry_map[entry_identifier]
+                else:
+                    entry_map[entry_identifier] = entry
+            else:
+                raise RuntimeError(
+                    "Unknown entry kind: {}".format(entry.kind))
+
+        return list(entry_map.values())
diff --git a/paimon-python/pypaimon/manifest/schema/manifest_entry.py 
b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
index b1fd244daf..eba2417863 100644
--- a/paimon-python/pypaimon/manifest/schema/manifest_entry.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
@@ -20,11 +20,12 @@ from dataclasses import dataclass
 
 from pypaimon.manifest.schema.data_file_meta import (DATA_FILE_META_SCHEMA,
                                                      DataFileMeta)
+from pypaimon.manifest.schema.file_entry import FileEntry
 from pypaimon.table.row.generic_row import GenericRow
 
 
 @dataclass
-class ManifestEntry:
+class ManifestEntry(FileEntry):
     kind: int
     partition: GenericRow
     bucket: int
diff --git a/paimon-python/pypaimon/schema/data_types.py 
b/paimon-python/pypaimon/schema/data_types.py
index 318ddfe02f..6befaa4d40 100755
--- a/paimon-python/pypaimon/schema/data_types.py
+++ b/paimon-python/pypaimon/schema/data_types.py
@@ -73,6 +73,16 @@ class AtomicType(DataType):
         super().__init__(nullable)
         self.type = type
 
+    def __eq__(self, other):
+        if self is other:
+            return True
+        if not isinstance(other, AtomicType):
+            return False
+        return self.type == other.type and self.nullable == other.nullable
+
+    def __hash__(self):
+        return hash((self.type, self.nullable))
+
     def to_dict(self) -> str:
         if not self.nullable:
             return self.type + " NOT NULL"
@@ -95,6 +105,16 @@ class ArrayType(DataType):
         super().__init__(nullable)
         self.element = element_type
 
+    def __eq__(self, other):
+        if self is other:
+            return True
+        if not isinstance(other, ArrayType):
+            return False
+        return self.element == other.element and self.nullable == 
other.nullable
+
+    def __hash__(self):
+        return hash((self.element, self.nullable))
+
     def to_dict(self) -> Dict[str, Any]:
         return {
             "type": "ARRAY" + (" NOT NULL" if not self.nullable else ""),
@@ -119,6 +139,16 @@ class MultisetType(DataType):
         super().__init__(nullable)
         self.element = element_type
 
+    def __eq__(self, other):
+        if self is other:
+            return True
+        if not isinstance(other, MultisetType):
+            return False
+        return self.element == other.element and self.nullable == 
other.nullable
+
+    def __hash__(self):
+        return hash((self.element, self.nullable))
+
     def to_dict(self) -> Dict[str, Any]:
         return {
             "type": "MULTISET{}{}".format('<' + str(self.element) + '>' if 
self.element else '',
@@ -150,6 +180,18 @@ class MapType(DataType):
         self.key = key_type
         self.value = value_type
 
+    def __eq__(self, other):
+        if self is other:
+            return True
+        if not isinstance(other, MapType):
+            return False
+        return (self.key == other.key
+                and self.value == other.value
+                and self.nullable == other.nullable)
+
+    def __hash__(self):
+        return hash((self.key, self.value, self.nullable))
+
     def to_dict(self) -> Dict[str, Any]:
         return {
             "type": "MAP<{}, {}>".format(self.key, self.value),
@@ -199,6 +241,21 @@ class DataField:
     def from_dict(cls, data: Dict[str, Any]) -> "DataField":
         return DataTypeParser.parse_data_field(data)
 
+    def __eq__(self, other):
+        if self is other:
+            return True
+        if not isinstance(other, DataField):
+            return False
+        return (self.id == other.id
+                and self.name == other.name
+                and self.type == other.type
+                and self.description == other.description
+                and self.default_value == other.default_value)
+
+    def __hash__(self):
+        return hash((self.id, self.name, self.type,
+                     self.description, self.default_value))
+
     def to_dict(self) -> Dict[str, Any]:
         result = {
             self.FIELD_ID: self.id,
@@ -223,6 +280,16 @@ class RowType(DataType):
         super().__init__(nullable)
         self.fields = fields or []
 
+    def __eq__(self, other):
+        if self is other:
+            return True
+        if not isinstance(other, RowType):
+            return False
+        return self.fields == other.fields and self.nullable == other.nullable
+
+    def __hash__(self):
+        return hash((tuple(self.fields), self.nullable))
+
     def to_dict(self) -> Dict[str, Any]:
         return {
             "type": "ROW" + ("" if self.nullable else " NOT NULL"),
@@ -587,7 +654,7 @@ class PyarrowFieldParser:
                      parent_name: str = "record") -> Union[str, Dict[str, 
Any]]:
         if pyarrow.types.is_integer(field_type):
             if (pyarrow.types.is_signed_integer(field_type) and 
field_type.bit_width <= 32) or \
-               (pyarrow.types.is_unsigned_integer(field_type) and 
field_type.bit_width < 32):
+                    (pyarrow.types.is_unsigned_integer(field_type) and 
field_type.bit_width < 32):
                 return "int"
             else:
                 return "long"
diff --git a/paimon-python/pypaimon/table/row/generic_row.py 
b/paimon-python/pypaimon/table/row/generic_row.py
index be5c1ec80f..4aa740de72 100644
--- a/paimon-python/pypaimon/table/row/generic_row.py
+++ b/paimon-python/pypaimon/table/row/generic_row.py
@@ -51,6 +51,16 @@ class GenericRow(InternalRow):
     def __len__(self) -> int:
         return len(self.values)
 
+    def __eq__(self, other):
+        if self is other:
+            return True
+        if not isinstance(other, GenericRow):
+            return False
+        return self.values == other.values and self.row_kind == other.row_kind
+
+    def __hash__(self):
+        return hash((tuple(self.values), tuple(self.fields), self.row_kind))
+
     def __str__(self):
         field_strs = [f"{field.name}={repr(value)}" for field, value in 
zip(self.fields, self.values)]
         return f"GenericRow(row_kind={self.row_kind.name}, {', 
'.join(field_strs)})"
diff --git a/paimon-python/pypaimon/tests/table_update_test.py 
b/paimon-python/pypaimon/tests/table_update_test.py
index ad9158e9fe..bf46f7bbac 100644
--- a/paimon-python/pypaimon/tests/table_update_test.py
+++ b/paimon-python/pypaimon/tests/table_update_test.py
@@ -859,6 +859,199 @@ class TableUpdateTest(unittest.TestCase):
                            'Seattle', 'Boston', 'Denver', 'Miami', 'Atlanta']
         self.assertEqual(expected_cities, cities, "Cities should remain 
unchanged")
 
+    def test_concurrent_updates_with_retry(self):
+        """Test data evolution with multiple threads performing concurrent 
updates.
+
+        Each thread updates different rows of the same column. If a conflict 
occurs,
+        the thread retries until the update succeeds. After all threads 
complete,
+        the final result is verified to ensure all updates were applied 
correctly.
+        """
+        import threading
+        table = self._create_table()
+
+        # Table has 5 rows (row_id 0-4) after _create_table:
+        #   row 0: age=25, row 1: age=30, row 2: age=35, row 3: age=40, row 4: 
age=45
+
+        # Thread 1 updates rows 0 and 1
+        # Thread 2 updates rows 2 and 3
+        # Thread 3 updates row 4
+        thread_updates = [
+            {'row_ids': [0, 1], 'ages': [100, 200]},
+            {'row_ids': [2, 3], 'ages': [300, 400]},
+            {'row_ids': [4], 'ages': [500]},
+        ]
+
+        errors = []
+        success_counts = [0] * len(thread_updates)
+
+        def update_worker(thread_index, update_spec):
+            max_retries = 20
+            for attempt in range(max_retries):
+                try:
+                    write_builder = table.new_batch_write_builder()
+                    table_update = 
write_builder.new_update().with_update_type(['age'])
+
+                    update_data = pa.Table.from_pydict({
+                        '_ROW_ID': update_spec['row_ids'],
+                        'age': update_spec['ages'],
+                    })
+
+                    commit_messages = 
table_update.update_by_arrow_with_row_id(update_data)
+
+                    table_commit = write_builder.new_commit()
+                    table_commit.commit(commit_messages)
+                    table_commit.close()
+
+                    success_counts[thread_index] = attempt + 1
+                    return
+                except Exception as e:
+                    import traceback
+                    self.assertIn(
+                        "For Data Evolution table, multiple 'MERGE INTO' 
operations have encountered conflicts",
+                        str(e),
+                        "Thread-{} attempt {} unexpected error: {}\n{}".format(
+                            thread_index, attempt + 1, e, 
traceback.format_exc()
+                        )
+                    )
+                    if attempt == max_retries - 1:
+                        errors.append(
+                            "Thread-{} failed after {} retries: {}".format(
+                                thread_index, max_retries, e
+                            )
+                        )
+
+        threads = []
+        for idx, spec in enumerate(thread_updates):
+            thread = threading.Thread(target=update_worker, args=(idx, spec))
+            threads.append(thread)
+
+        for thread in threads:
+            thread.start()
+
+        for thread in threads:
+            thread.join(timeout=120)
+
+        if errors:
+            self.fail("Some threads failed:\n" + "\n".join(errors))
+
+        for idx, count in enumerate(success_counts):
+            self.assertGreater(
+                count, 0,
+                "Thread-{} did not succeed".format(idx)
+            )
+
+        # Verify the final data
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        result = table_read.to_arrow(splits)
+
+        ages = result['age'].to_pylist()
+        expected_ages = [100, 200, 300, 400, 500]
+        self.assertEqual(expected_ages, ages,
+                         "Concurrent updates did not produce correct final 
result")
+
+    def test_concurrent_updates_same_rows_with_retry(self):
+        """Test data evolution with multiple threads updating overlapping rows.
+
+        Multiple threads compete to update the same rows. Each thread retries
+        on conflict until success. The final result should reflect one of the
+        successful updates for each row (last-writer-wins).
+        """
+        import threading
+
+        table = self._create_table()
+
+        # All threads update the same rows but with different values
+        thread_updates = [
+            {'row_ids': [0, 1, 2], 'ages': [101, 201, 301], 'thread_name': 
'A'},
+            {'row_ids': [0, 1, 2], 'ages': [102, 202, 302], 'thread_name': 
'B'},
+            {'row_ids': [0, 1, 2], 'ages': [103, 203, 303], 'thread_name': 
'C'},
+        ]
+
+        errors = []
+        completion_order = []
+        order_lock = threading.Lock()
+
+        def update_worker(thread_index, update_spec):
+            max_retries = 30
+            for attempt in range(max_retries):
+                try:
+                    write_builder = table.new_batch_write_builder()
+                    table_update = 
write_builder.new_update().with_update_type(['age'])
+
+                    update_data = pa.Table.from_pydict({
+                        '_ROW_ID': update_spec['row_ids'],
+                        'age': update_spec['ages'],
+                    })
+
+                    commit_messages = 
table_update.update_by_arrow_with_row_id(update_data)
+
+                    table_commit = write_builder.new_commit()
+                    table_commit.commit(commit_messages)
+                    table_commit.close()
+
+                    with order_lock:
+                        completion_order.append(thread_index)
+                    return
+                except Exception as e:
+                    import traceback
+                    self.assertIn(
+                        "For Data Evolution table, multiple 'MERGE INTO' 
operations have encountered conflicts",
+                        str(e),
+                        "Thread-{} attempt {} unexpected error: {}\n{}".format(
+                            thread_index, attempt + 1, e, 
traceback.format_exc()
+                        )
+                    )
+                    if attempt == max_retries - 1:
+                        errors.append(
+                            "Thread-{} ({}) failed after {} retries: 
{}".format(
+                                thread_index, update_spec['thread_name'],
+                                max_retries, e
+                            )
+                        )
+
+        threads = []
+        for idx, spec in enumerate(thread_updates):
+            thread = threading.Thread(target=update_worker, args=(idx, spec))
+            threads.append(thread)
+
+        for thread in threads:
+            thread.start()
+
+        for thread in threads:
+            thread.join(timeout=120)
+
+        if errors:
+            self.fail("Some threads failed:\n" + "\n".join(errors))
+
+        self.assertEqual(
+            len(completion_order), len(thread_updates),
+            "Not all threads completed successfully"
+        )
+
+        # Verify the final data: the last thread to commit wins
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        result = table_read.to_arrow(splits)
+
+        ages = result['age'].to_pylist()
+
+        # The last thread to successfully commit determines rows 0-2
+        last_winner = completion_order[-1]
+        winner_ages = thread_updates[last_winner]['ages']
+        self.assertEqual(winner_ages[0], ages[0],
+                         "Row 0 should reflect last writer's value")
+        self.assertEqual(winner_ages[1], ages[1],
+                         "Row 1 should reflect last writer's value")
+        self.assertEqual(winner_ages[2], ages[2],
+                         "Row 2 should reflect last writer's value")
+
+        # Rows 3 and 4 should remain unchanged
+        self.assertEqual(40, ages[3], "Row 3 should remain unchanged")
+        self.assertEqual(45, ages[4], "Row 4 should remain unchanged")
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/utils/range_helper.py 
b/paimon-python/pypaimon/utils/range_helper.py
new file mode 100644
index 0000000000..703d018599
--- /dev/null
+++ b/paimon-python/pypaimon/utils/range_helper.py
@@ -0,0 +1,133 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+"""
+A helper class to handle ranges.
+"""
+
+
+class RangeHelper:
+    """A helper class to handle ranges.
+
+    Provides methods to check if all ranges are the same and to merge
+    overlapping ranges into groups, preserving original order within groups.
+
+    Args:
+        range_function: A callable that extracts a Range from an element T.
+    """
+
+    def __init__(self, range_function):
+        self._range_function = range_function
+
+    def are_all_ranges_same(self, items):
+        """Check if all items have the same range.
+
+        Args:
+            items: List of items to check.
+
+        Returns:
+            True if all items have the same range, False otherwise.
+        """
+        if not items:
+            return True
+
+        first = items[0]
+        first_range = self._range_function(first)
+        if first_range is None:
+            return False
+
+        for item in items[1:]:
+            if item is None:
+                return False
+            current_range = self._range_function(item)
+            if current_range is None:
+                return False
+            if current_range.from_ != first_range.from_ or current_range.to != 
first_range.to:
+                return False
+
+        return True
+
+    def merge_overlapping_ranges(self, items):
+        """Merge items with overlapping ranges into groups.
+
+        Sorts items by range start, then merges overlapping groups.
+        Within each group, items are sorted by their original index.
+
+        Args:
+            items: List of items with non-null ranges.
+
+        Returns:
+            List of groups, where each group is a list of items
+            with overlapping ranges.
+        """
+        if not items:
+            return []
+
+        # Create indexed values to track original indices
+        indexed = []
+        for original_index, item in enumerate(items):
+            item_range = self._range_function(item)
+            if item_range is not None:
+                indexed.append(_IndexedValue(item, item_range, original_index))
+
+        if not indexed:
+            return []
+
+        # Sort by range start, then by range end
+        indexed.sort(key=lambda iv: (iv.start(), iv.end()))
+
+        groups = []
+        current_group = [indexed[0]]
+        current_end = indexed[0].end()
+
+        # Iterate through sorted ranges and merge overlapping ones
+        for i in range(1, len(indexed)):
+            current = indexed[i]
+            if current.start() <= current_end:
+                current_group.append(current)
+                if current.end() > current_end:
+                    current_end = current.end()
+            else:
+                groups.append(current_group)
+                current_group = [current]
+                current_end = current.end()
+
+        # Add the last group
+        groups.append(current_group)
+
+        # Convert groups to result, sorting each group by original index
+        result = []
+        for group in groups:
+            group.sort(key=lambda iv: iv.original_index)
+            result.append([iv.value for iv in group])
+
+        return result
+
+
+class _IndexedValue:
+    """A helper class to track original indices during range merging."""
+
+    def __init__(self, value, item_range, original_index):
+        self.value = value
+        self.range = item_range
+        self.original_index = original_index
+
+    def start(self):
+        return self.range.from_
+
+    def end(self):
+        return self.range.to
diff --git a/paimon-python/pypaimon/write/commit/__init__.py 
b/paimon-python/pypaimon/write/commit/__init__.py
new file mode 100644
index 0000000000..a67d5ea255
--- /dev/null
+++ b/paimon-python/pypaimon/write/commit/__init__.py
@@ -0,0 +1,16 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
diff --git a/paimon-python/pypaimon/write/commit/commit_rollback.py 
b/paimon-python/pypaimon/write/commit/commit_rollback.py
new file mode 100644
index 0000000000..66106efae7
--- /dev/null
+++ b/paimon-python/pypaimon/write/commit/commit_rollback.py
@@ -0,0 +1,62 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+Commit rollback to rollback 'COMPACT' commits for resolving conflicts.
+"""
+
+from pypaimon.table.instant import Instant
+
+
+class CommitRollback:
+    """Rollback COMPACT commits to resolve conflicts.
+
+    When a conflict is detected during commit, if the latest snapshot is a
+    COMPACT commit, it can be rolled back via TableRollback.
+    """
+
+    def __init__(self, table_rollback):
+        """Initialize CommitRollback.
+
+        Args:
+            table_rollback: A TableRollback instance used to perform the 
rollback.
+        """
+        self._table_rollback = table_rollback
+
+    def try_to_rollback(self, latest_snapshot):
+        """Try to rollback a COMPACT commit to resolve conflicts.
+
+        Only rolls back COMPACT type commits. Delegates to TableRollback
+        to rollback to the previous snapshot (latest - 1), passing the
+        latest snapshot ID as from_snapshot.
+
+        Args:
+            latest_snapshot: The latest snapshot that may need to be rolled 
back.
+
+        Returns:
+            True if rollback succeeded, False otherwise.
+        """
+        if latest_snapshot.commit_kind == "COMPACT":
+            latest_id = latest_snapshot.id
+            try:
+                self._table_rollback.rollback_to(
+                    Instant.snapshot(latest_id - 1), latest_id)
+                return True
+            except Exception:
+                pass
+        return False
diff --git a/paimon-python/pypaimon/write/commit/commit_scanner.py 
b/paimon-python/pypaimon/write/commit/commit_scanner.py
new file mode 100644
index 0000000000..6158c86f5c
--- /dev/null
+++ b/paimon-python/pypaimon/write/commit/commit_scanner.py
@@ -0,0 +1,127 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+"""
+Manifest entries scanner for commit operations.
+"""
+from typing import Optional, List
+
+from pypaimon.common.predicate_builder import PredicateBuilder
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.scanner.file_scanner import FileScanner
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class CommitScanner:
+    """Manifest entries scanner for commit operations.
+
+    This class provides methods to scan manifest entries for commit operations
+    """
+
+    def __init__(self, table, manifest_list_manager: ManifestListManager):
+        """Initialize CommitScanner.
+
+        Args:
+            table: The FileStoreTable instance.
+            manifest_list_manager: Manager for reading manifest lists.
+        """
+        self.table = table
+        self.manifest_list_manager = manifest_list_manager
+
+    def read_all_entries_from_changed_partitions(self, latest_snapshot: 
Optional[Snapshot],
+                                                 commit_entries: 
List[ManifestEntry]):
+        """Read all entries from the latest snapshot for partitions that are 
changed.
+
+        Builds a partition predicate from delta entries and passes it to 
FileScanner,
+        so that manifest files and entries are filtered during reading rather 
than
+        after a full scan.
+
+        Args:
+            latest_snapshot: The latest snapshot to read entries from.
+            commit_entries: The delta entries being committed, used to 
determine
+                which partitions have changed.
+
+        Returns:
+            List of ManifestEntry from the latest snapshot for changed 
partitions.
+        """
+        if latest_snapshot is None:
+            return []
+
+        partition_filter = 
self._build_partition_filter_from_entries(commit_entries)
+
+        all_manifests = self.manifest_list_manager.read_all(latest_snapshot)
+        return FileScanner(
+            self.table, lambda: [], partition_filter
+        ).read_manifest_entries(all_manifests)
+
+    def read_incremental_entries_from_changed_partitions(self, snapshot: 
Snapshot,
+                                                         commit_entries: 
List[ManifestEntry]):
+        """Read incremental manifest entries from a snapshot's delta manifest 
list.
+
+        Builds a partition predicate from delta entries and passes it to 
FileScanner,
+        so that manifest files and entries are filtered during reading rather 
than
+        after a full scan.
+
+        Args:
+            snapshot: The snapshot to read incremental entries from.
+            commit_entries: The delta entries being committed, used to 
determine
+                which partitions have changed.
+
+        Returns:
+            List of ManifestEntry matching the partition filter.
+        """
+        delta_manifests = self.manifest_list_manager.read_delta(snapshot)
+        if not delta_manifests:
+            return []
+
+        partition_filter = 
self._build_partition_filter_from_entries(commit_entries)
+
+        return FileScanner(
+            self.table, lambda: [], partition_filter
+        ).read_manifest_entries(delta_manifests)
+
+    def _build_partition_filter_from_entries(self, entries: 
List[ManifestEntry]):
+        """Build a partition predicate that matches all partitions present in 
the given entries.
+
+        Args:
+            entries: List of ManifestEntry whose partitions should be matched.
+
+        Returns:
+            A Predicate matching any of the changed partitions, or None if
+            partition keys are empty.
+        """
+        partition_keys = self.table.partition_keys
+        if not partition_keys:
+            return None
+
+        changed_partitions = set()
+        for entry in entries:
+            changed_partitions.add(tuple(entry.partition.values))
+
+        if not changed_partitions:
+            return None
+
+        predicate_builder = PredicateBuilder(self.table.fields)
+        partition_predicates = []
+        for partition_values in changed_partitions:
+            sub_predicates = []
+            for i, key in enumerate(partition_keys):
+                sub_predicates.append(predicate_builder.equal(key, 
partition_values[i]))
+            
partition_predicates.append(predicate_builder.and_predicates(sub_predicates))
+
+        return predicate_builder.or_predicates(partition_predicates)
diff --git a/paimon-python/pypaimon/write/commit/conflict_detection.py 
b/paimon-python/pypaimon/write/commit/conflict_detection.py
new file mode 100644
index 0000000000..8e62c946e0
--- /dev/null
+++ b/paimon-python/pypaimon/write/commit/conflict_detection.py
@@ -0,0 +1,203 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+"""
+Conflict detection for commit operations.
+"""
+
+
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.file_entry import FileEntry
+from pypaimon.utils.range import Range
+from pypaimon.utils.range_helper import RangeHelper
+from pypaimon.write.commit.commit_scanner import CommitScanner
+
+
+class ConflictDetection:
+    """Detects conflicts between base and delta files during commit.
+
+    This class provides row ID range conflict checks and row ID from snapshot 
conflict checks
+    for Data Evolution tables.
+    """
+
+    def __init__(self, data_evolution_enabled, snapshot_manager,
+                 manifest_list_manager: ManifestListManager, table, 
commit_scanner: CommitScanner):
+        """Initialize ConflictDetection.
+
+        Args:
+            data_evolution_enabled: Whether data evolution feature is enabled.
+            snapshot_manager: Manager for reading snapshot metadata.
+            manifest_list_manager: Manager for reading manifest lists.
+            table: The FileStoreTable instance.
+        """
+        self.data_evolution_enabled = data_evolution_enabled
+        self.snapshot_manager = snapshot_manager
+        self.manifest_list_manager = manifest_list_manager
+        self.table = table
+        self._row_id_check_from_snapshot = None
+        self.commit_scanner = commit_scanner
+
+    def should_be_overwrite_commit(self):
+        """Check if the commit should be treated as an overwrite commit.
+
+        returns True if rowIdCheckFromSnapshot is set.
+
+        Returns:
+            True if the commit should be treated as OVERWRITE.
+        """
+        return self._row_id_check_from_snapshot is not None
+
+    def check_conflicts(self, latest_snapshot, base_entries, delta_entries, 
commit_kind):
+        """Run all conflict checks and return the first detected conflict.
+
+        merges base_entries and delta_entries, then runs conflict checks
+        on the merged result.
+
+        Args:
+            latest_snapshot: The latest snapshot at commit time.
+            base_entries: All entries read from the latest snapshot.
+            delta_entries: The delta entries being committed.
+            commit_kind: The kind of commit (e.g. "APPEND", "COMPACT", 
"OVERWRITE").
+
+        Returns:
+            A RuntimeError if a conflict is detected, otherwise None.
+        """
+        all_entries = list(base_entries) + list(delta_entries)
+
+        try:
+            merged_entries = FileEntry.merge_entries(all_entries)
+        except Exception as e:
+            return RuntimeError(
+                "File deletion conflicts detected! Give up committing. " + 
str(e))
+
+        conflict = self.check_row_id_range_conflicts(commit_kind, 
merged_entries)
+        if conflict is not None:
+            return conflict
+
+        return self.check_row_id_from_snapshot(latest_snapshot, delta_entries)
+
+    def check_row_id_range_conflicts(self, commit_kind, commit_entries):
+        """Check for row ID range conflicts among merged entries.
+
+        only enabled when data evolution is active, and checks that
+        overlapping row ID ranges in non-blob data files are identical.
+
+        Args:
+            commit_kind: The kind of commit (e.g. "APPEND", "COMPACT").
+            commit_entries: The entries being committed.
+
+        Returns:
+            A RuntimeError if conflict is detected, otherwise None.
+        """
+        if not self.data_evolution_enabled:
+            return None
+        if self._row_id_check_from_snapshot is None and commit_kind != 
"COMPACT":
+            return None
+
+        entries_with_row_id = [
+            entry for entry in commit_entries
+            if entry.file.first_row_id is not None
+        ]
+
+        if not entries_with_row_id:
+            return None
+
+        range_helper = RangeHelper(lambda entry: entry.file.row_id_range())
+        merged_groups = 
range_helper.merge_overlapping_ranges(entries_with_row_id)
+
+        for group in merged_groups:
+            data_files = [
+                entry for entry in group
+                if not DataFileMeta.is_blob_file(entry.file.file_name)
+            ]
+            if not range_helper.are_all_ranges_same(data_files):
+                file_descriptions = [
+                    "{name}(rowId={row_id}, count={count})".format(
+                        name=entry.file.file_name,
+                        row_id=entry.file.first_row_id,
+                        count=entry.file.row_count,
+                    )
+                    for entry in data_files
+                ]
+                return RuntimeError(
+                    "For Data Evolution table, multiple 'MERGE INTO' and 
'COMPACT' "
+                    "operations have encountered conflicts, data files: "
+                    + str(file_descriptions))
+
+        return None
+
+    def check_row_id_from_snapshot(self, latest_snapshot, commit_entries):
+        """Check for row ID conflicts from a specific snapshot onwards.
+
+        collects row ID ranges from delta entries, then checks if any
+        incremental changes between the check snapshot and latest snapshot
+        have overlapping row ID ranges.
+
+        Args:
+            latest_snapshot: The latest snapshot at commit time.
+            commit_entries: The delta entries being committed.
+
+        Returns:
+            A RuntimeError if conflict is detected, otherwise None.
+        """
+        if not self.data_evolution_enabled:
+            return None
+        if self._row_id_check_from_snapshot is None:
+            return None
+
+        history_id_ranges = []
+        for entry in commit_entries:
+            first_row_id = entry.file.first_row_id
+            row_count = entry.file.row_count
+            if first_row_id is not None:
+                history_id_ranges.append(
+                    Range(first_row_id, first_row_id + row_count - 1))
+
+        check_snapshot = self.snapshot_manager.get_snapshot_by_id(
+            self._row_id_check_from_snapshot)
+        if check_snapshot is None or check_snapshot.next_row_id is None:
+            raise RuntimeError(
+                "Next row id cannot be null for snapshot "
+                
"{snapshot}.".format(snapshot=self._row_id_check_from_snapshot))
+        check_next_row_id = check_snapshot.next_row_id
+
+        for snapshot_id in range(
+                self._row_id_check_from_snapshot + 1,
+                latest_snapshot.id + 1):
+            snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id)
+            if snapshot is None:
+                continue
+            if snapshot.commit_kind == "COMPACT":
+                continue
+
+            incremental_entries = 
self.commit_scanner.read_incremental_entries_from_changed_partitions(
+                snapshot, commit_entries)
+            for entry in incremental_entries:
+                file_range = entry.file.row_id_range()
+                if file_range is None:
+                    continue
+                if file_range.from_ < check_next_row_id:
+                    for history_range in history_id_ranges:
+                        if history_range.overlaps(file_range):
+                            return RuntimeError(
+                                "For Data Evolution table, multiple 'MERGE 
INTO' "
+                                "operations have encountered conflicts, 
updating "
+                                "the same file, which can render some updates "
+                                "ineffective.")
+
+        return None
diff --git a/paimon-python/pypaimon/write/commit_message.py 
b/paimon-python/pypaimon/write/commit_message.py
index b36a1b1bbf..d560c5a247 100644
--- a/paimon-python/pypaimon/write/commit_message.py
+++ b/paimon-python/pypaimon/write/commit_message.py
@@ -17,7 +17,7 @@
 
################################################################################
 
 from dataclasses import dataclass
-from typing import List, Tuple
+from typing import List, Tuple, Optional
 
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 
@@ -27,6 +27,7 @@ class CommitMessage:
     partition: Tuple
     bucket: int
     new_files: List[DataFileMeta]
+    check_from_snapshot: Optional[int] = -1
 
     def is_empty(self):
         return not self.new_files
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 80eb858087..ee7d7a9694 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -27,6 +27,7 @@ from pypaimon.manifest.manifest_file_manager import 
ManifestFileManager
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+
 from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
 from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.read.scanner.file_scanner import FileScanner
@@ -36,6 +37,9 @@ from pypaimon.snapshot.snapshot_commit import 
(PartitionStatistics,
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.row.generic_row import GenericRow
 from pypaimon.table.row.offset_row import OffsetRow
+from pypaimon.write.commit.commit_rollback import CommitRollback
+from pypaimon.write.commit.commit_scanner import CommitScanner
+from pypaimon.write.commit.conflict_detection import ConflictDetection
 from pypaimon.write.commit_message import CommitMessage
 
 logger = logging.getLogger(__name__)
@@ -93,11 +97,30 @@ class FileStoreCommit:
         self.commit_min_retry_wait = table.options.commit_min_retry_wait()
         self.commit_max_retry_wait = table.options.commit_max_retry_wait()
 
+        self.commit_scanner = CommitScanner(table, self.manifest_list_manager)
+
+        self.conflict_detection = ConflictDetection(
+            data_evolution_enabled=table.options.data_evolution_enabled(),
+            snapshot_manager=self.snapshot_manager,
+            manifest_list_manager=self.manifest_list_manager,
+            table=table,
+            commit_scanner=self.commit_scanner
+        )
+
+        table_rollback = table.catalog_environment.catalog_table_rollback()
+        self.rollback = CommitRollback(table_rollback) if table_rollback is 
not None else None
+
     def commit(self, commit_messages: List[CommitMessage], commit_identifier: 
int):
         """Commit the given commit messages in normal append mode."""
         if not commit_messages:
             return
 
+        # Extract the minimum check_from_snapshot from commit messages
+        valid_snapshots = [msg.check_from_snapshot for msg in commit_messages
+                           if msg.check_from_snapshot != -1]
+        if valid_snapshots:
+            self.conflict_detection._row_id_check_from_snapshot = 
min(valid_snapshots)
+
         logger.info(
             "Ready to commit to table %s, number of commit messages: %d",
             self.table.identifier,
@@ -116,9 +139,20 @@ class FileStoreCommit:
                 ))
 
         logger.info("Finished collecting changes, including: %d entries", 
len(commit_entries))
-        self._try_commit(commit_kind="APPEND",
+
+        commit_kind = "APPEND"
+        detect_conflicts = False
+        allow_rollback = False
+        if self.conflict_detection.should_be_overwrite_commit():
+            commit_kind = "OVERWRITE"
+            detect_conflicts = True
+            allow_rollback = True
+
+        self._try_commit(commit_kind=commit_kind,
                          commit_identifier=commit_identifier,
-                         commit_entries_plan=lambda snapshot: commit_entries)
+                         commit_entries_plan=lambda snapshot: commit_entries,
+                         detect_conflicts=detect_conflicts,
+                         allow_rollback=allow_rollback)
 
     def overwrite(self, overwrite_partition, commit_messages: 
List[CommitMessage], commit_identifier: int):
         """Commit the given commit messages in overwrite mode."""
@@ -149,7 +183,9 @@ class FileStoreCommit:
             commit_kind="OVERWRITE",
             commit_identifier=commit_identifier,
             commit_entries_plan=lambda snapshot: 
self._generate_overwrite_entries(
-                snapshot, partition_filter, commit_messages)
+                snapshot, partition_filter, commit_messages),
+            detect_conflicts=True,
+            allow_rollback=False,
         )
 
     def drop_partitions(self, partitions: List[Dict[str, str]], 
commit_identifier: int) -> None:
@@ -187,10 +223,13 @@ class FileStoreCommit:
             commit_kind="OVERWRITE",
             commit_identifier=commit_identifier,
             commit_entries_plan=lambda snapshot: 
self._generate_overwrite_entries(
-                snapshot, partition_filter, [])
+                snapshot, partition_filter, []),
+            detect_conflicts=True,
+            allow_rollback=False,
         )
 
-    def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan):
+    def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan,
+                    detect_conflicts=False, allow_rollback=False):
         import threading
 
         retry_count = 0
@@ -211,7 +250,9 @@ class FileStoreCommit:
                 commit_kind=commit_kind,
                 commit_entries=commit_entries,
                 commit_identifier=commit_identifier,
-                latest_snapshot=latest_snapshot
+                latest_snapshot=latest_snapshot,
+                detect_conflicts=detect_conflicts,
+                allow_rollback=allow_rollback,
             )
 
             if result.is_success():
@@ -267,11 +308,13 @@ class FileStoreCommit:
 
     def _try_commit_once(self, retry_result: Optional[RetryResult], 
commit_kind: str,
                          commit_entries: List[ManifestEntry], 
commit_identifier: int,
-                         latest_snapshot: Optional[Snapshot]) -> CommitResult:
+                         latest_snapshot: Optional[Snapshot],
+                         detect_conflicts: bool = False,
+                         allow_rollback: bool = False) -> CommitResult:
         start_millis = int(time.time() * 1000)
         if self._is_duplicate_commit(retry_result, latest_snapshot, 
commit_identifier, commit_kind):
             return SuccessResult()
-        
+
         unique_id = uuid.uuid4()
         base_manifest_list = f"manifest-list-{unique_id}-0"
         delta_manifest_list = f"manifest-list-{unique_id}-1"
@@ -296,8 +339,20 @@ class FileStoreCommit:
             # Assign row IDs to new files and get the next row ID for the 
snapshot
             commit_entries, next_row_id = 
self._assign_row_tracking_meta(first_row_id_start, commit_entries)
 
+        # Conflict detection: read base entries from latest snapshot, then 
check conflicts
+        if detect_conflicts and latest_snapshot is not None:
+            base_entries = 
self.commit_scanner.read_all_entries_from_changed_partitions(
+                latest_snapshot, commit_entries)
+            conflict_exception = self.conflict_detection.check_conflicts(
+                latest_snapshot, base_entries, commit_entries, commit_kind)
+
+            if conflict_exception is not None:
+                if allow_rollback and self.rollback is not None:
+                    if self.rollback.try_to_rollback(latest_snapshot):
+                        return RetryResult(latest_snapshot, conflict_exception)
+                raise conflict_exception
+
         try:
-            # TODO: implement noConflictsOrFail logic
             new_manifest_file_meta = self._write_manifest_file(commit_entries, 
new_manifest_file)
             self.manifest_list_manager.write(delta_manifest_list, 
[new_manifest_file_meta])
 
@@ -452,12 +507,12 @@ class FileStoreCommit:
                     return True
         return False
 
-    def _generate_overwrite_entries(self, latestSnapshot, partition_filter, 
commit_messages):
+    def _generate_overwrite_entries(self, latest_snapshot, partition_filter, 
commit_messages):
         """Generate commit entries for OVERWRITE mode based on latest 
snapshot."""
         entries = []
-        current_entries = [] if latestSnapshot is None \
+        current_entries = [] if latest_snapshot is None \
             else (FileScanner(self.table, lambda: [], partition_filter).
-                  
read_manifest_entries(self.manifest_list_manager.read_all(latestSnapshot)))
+                  
read_manifest_entries(self.manifest_list_manager.read_all(latest_snapshot)))
         for entry in current_entries:
             entry.kind = 1  # DELETE
             entries.append(entry)
diff --git a/paimon-python/pypaimon/write/table_update_by_row_id.py 
b/paimon-python/pypaimon/write/table_update_by_row_id.py
index b027564ff3..48f61751ec 100644
--- a/paimon-python/pypaimon/write/table_update_by_row_id.py
+++ b/paimon-python/pypaimon/write/table_update_by_row_id.py
@@ -47,7 +47,8 @@ class TableUpdateByRowId:
         self.commit_user = commit_user
 
         # Load existing first_row_ids and build partition map
-        (self.first_row_ids,
+        (self.snapshot_id,
+         self.first_row_ids,
          self.first_row_id_to_partition_map,
          self.first_row_id_to_row_count_map,
          self.total_row_count,
@@ -76,7 +77,9 @@ class TableUpdateByRowId:
 
         total_row_count = sum(first_row_id_to_row_count_map.values())
 
-        return (sorted(list(set(first_row_ids))),
+        snapshot_id = self.table.snapshot_manager().get_latest_snapshot().id
+        return (snapshot_id,
+                sorted(list(set(first_row_ids))),
                 first_row_id_to_partition_map,
                 first_row_id_to_row_count_map,
                 total_row_count,
@@ -308,6 +311,7 @@ class TableUpdateByRowId:
 
         # Assign first_row_id to the new files
         for msg in commit_messages:
+            msg.check_from_snapshot = self.snapshot_id
             for file in msg.new_files:
                 # Assign the same first_row_id as the original file
                 file.first_row_id = first_row_id

Reply via email to