This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 73c962ab9f [core][python] Add row ID existence check and column-level 
conflict detection (#7971)
73c962ab9f is described below

commit 73c962ab9ff7e72bffe0be1e548c7047a3458564
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 26 16:23:52 2026 +0800

    [core][python] Add row ID existence check and column-level conflict 
detection (#7971)
    
    Add a Rust-style row ID existence check to both Java and Python commit
    validation, ensuring pre-assigned firstRowId files reference base files
    that still exist in the current snapshot. This covers a gap where
    checkRowIdRangeConflicts misses conflicts when base files are fully
    deleted (e.g., DROP PARTITION / OVERWRITE) with no replacement.
    
    Also port Java's RowIdColumnConflictChecker to Python, enabling
    row×column two-dimensional conflict detection in
    check_row_id_from_snapshot. This allows concurrent MERGE INTO on
    different columns of the same rows to proceed without false conflicts.
---
 .../paimon/operation/commit/ConflictDetection.java | 101 +++++++
 .../operation/commit/ConflictDetectionTest.java    | 115 ++++++++
 .../pypaimon/tests/e2e/java_py_read_write_test.py  |   2 +-
 .../tests/write/conflict_detection_test.py         | 297 +++++++++++++++++++++
 .../pypaimon/write/commit/conflict_detection.py    | 241 ++++++++++++-----
 paimon-python/pypaimon/write/file_store_commit.py  |  23 +-
 .../paimon/spark/sql/RowTrackingTestBase.scala     |   5 +-
 7 files changed, 704 insertions(+), 80 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 29ee045818..493d13d88d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -224,6 +224,14 @@ public class ConflictDetection {
             return exception;
         }
 
+        if (commitKind != CommitKind.COMPACT) {
+            Long nextRowId = latestSnapshot.nextRowId();
+            exception = checkRowIdExistence(baseEntries, deltaEntries, 
nextRowId);
+            if (exception.isPresent()) {
+                return exception;
+            }
+        }
+
         exception = checkRowIdRangeConflicts(commitKind, mergedEntries);
         if (exception.isPresent()) {
             return exception;
@@ -536,6 +544,99 @@ public class ConflictDetection {
         return Optional.empty();
     }
 
+    Optional<RuntimeException> checkRowIdExistence(
+            List<SimpleFileEntry> baseEntries,
+            List<SimpleFileEntry> deltaEntries,
+            @Nullable Long nextRowId) {
+        if (!dataEvolutionEnabled) {
+            return Optional.empty();
+        }
+
+        List<SimpleFileEntry> filesToCheck =
+                deltaEntries.stream()
+                        .filter(
+                                e ->
+                                        e.kind() == FileKind.ADD
+                                                && e.firstRowId() != null
+                                                && nextRowId != null
+                                                && e.firstRowId() < nextRowId)
+                        .collect(Collectors.toList());
+
+        if (filesToCheck.isEmpty()) {
+            return Optional.empty();
+        }
+
+        Set<FileRowIdKey> existingIndex = new HashSet<>();
+        for (SimpleFileEntry base : baseEntries) {
+            if (base.firstRowId() != null) {
+                existingIndex.add(
+                        new FileRowIdKey(
+                                base.partition(),
+                                base.bucket(),
+                                base.firstRowId(),
+                                base.rowCount()));
+            }
+        }
+
+        for (SimpleFileEntry entry : filesToCheck) {
+            FileRowIdKey key =
+                    new FileRowIdKey(
+                            entry.partition(),
+                            entry.bucket(),
+                            entry.firstRowId(),
+                            entry.rowCount());
+            if (!existingIndex.contains(key)) {
+                return Optional.of(
+                        new RuntimeException(
+                                String.format(
+                                        "Row ID existence conflict: file '%s' 
references "
+                                                + "firstRowId=%d, rowCount=%d 
in bucket %d, "
+                                                + "but no matching file exists 
in the current snapshot. "
+                                                + "The referenced file may 
have been rewritten by a "
+                                                + "concurrent compaction or 
removed by an overwrite.",
+                                        entry.fileName(),
+                                        entry.firstRowId(),
+                                        entry.rowCount(),
+                                        entry.bucket())));
+            }
+        }
+        return Optional.empty();
+    }
+
+    private static class FileRowIdKey {
+        private final BinaryRow partition;
+        private final int bucket;
+        private final long firstRowId;
+        private final long rowCount;
+
+        FileRowIdKey(BinaryRow partition, int bucket, long firstRowId, long 
rowCount) {
+            this.partition = partition;
+            this.bucket = bucket;
+            this.firstRowId = firstRowId;
+            this.rowCount = rowCount;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            FileRowIdKey that = (FileRowIdKey) o;
+            return bucket == that.bucket
+                    && firstRowId == that.firstRowId
+                    && rowCount == that.rowCount
+                    && Objects.equals(partition, that.partition);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(partition, bucket, firstRowId, rowCount);
+        }
+    }
+
     private static boolean dedicatedStorageFile(String fileName) {
         return isBlobFile(fileName) || isVectorStoreFile(fileName);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
index c3e0258da2..1c36b9e09e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
@@ -38,6 +38,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Optional;
 
 import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
@@ -374,6 +375,120 @@ class ConflictDetectionTest {
                 .isFalse();
     }
 
+    @Test
+    void testCheckRowIdExistenceNoConflict() {
+        ConflictDetection detection = createConflictDetection();
+
+        List<SimpleFileEntry> baseEntries = new ArrayList<>();
+        baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L));
+
+        List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+        deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));
+
+        assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 
100L)).isEmpty();
+    }
+
+    @Test
+    void testCheckRowIdExistenceBaseFileRemoved() {
+        ConflictDetection detection = createConflictDetection();
+
+        List<SimpleFileEntry> baseEntries = new ArrayList<>();
+
+        List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+        deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));
+
+        Optional<RuntimeException> result =
+                detection.checkRowIdExistence(baseEntries, deltaEntries, 100L);
+        assertThat(result).isPresent();
+        assertThat(result.get().getMessage()).contains("Row ID existence 
conflict");
+    }
+
+    @Test
+    void testCheckRowIdExistenceBaseFileRewritten() {
+        ConflictDetection detection = createConflictDetection();
+
+        List<SimpleFileEntry> baseEntries = new ArrayList<>();
+        baseEntries.add(createFileEntryWithRowId("f2", ADD, 0L, 200L));
+
+        List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+        deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));
+
+        Optional<RuntimeException> result =
+                detection.checkRowIdExistence(baseEntries, deltaEntries, 200L);
+        assertThat(result).isPresent();
+        assertThat(result.get().getMessage()).contains("Row ID existence 
conflict");
+    }
+
+    @Test
+    void testCheckRowIdExistenceSkipsNewlyAppendedFiles() {
+        ConflictDetection detection = createConflictDetection();
+
+        // nextRowId=100: files with firstRowId >= 100 are newly appended, not 
references
+        List<SimpleFileEntry> baseEntries = new ArrayList<>();
+        baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L));
+
+        List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+        // partial-column update referencing existing rows (firstRowId=0 < 
nextRowId=100)
+        deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));
+        // newly appended file (firstRowId=100 >= nextRowId=100), should be 
skipped
+        deltaEntries.add(createFileEntryWithRowId("new1", ADD, 100L, 50L));
+
+        assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 
100L)).isEmpty();
+    }
+
+    @Test
+    void testCheckRowIdExistenceSkipsNonPreAssigned() {
+        ConflictDetection detection = createConflictDetection();
+
+        List<SimpleFileEntry> baseEntries = new ArrayList<>();
+
+        List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+        deltaEntries.add(createFileEntry("f1", ADD));
+
+        assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 
100L)).isEmpty();
+    }
+
+    @Test
+    void testCheckRowIdExistenceSkipsDeleteEntries() {
+        ConflictDetection detection = createConflictDetection();
+
+        List<SimpleFileEntry> baseEntries = new ArrayList<>();
+
+        List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+        deltaEntries.add(createFileEntryWithRowId("f1", DELETE, 0L, 100L));
+
+        assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 
100L)).isEmpty();
+    }
+
+    @Test
+    void testCheckRowIdExistenceSkipsWhenNextRowIdNull() {
+        ConflictDetection detection = createConflictDetection();
+
+        List<SimpleFileEntry> baseEntries = new ArrayList<>();
+        List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+        deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));
+
+        assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 
null)).isEmpty();
+    }
+
+    private SimpleFileEntry createFileEntryWithRowId(
+            String fileName, FileKind kind, long firstRowId, long rowCount) {
+        return new SimpleFileEntry(
+                kind,
+                EMPTY_ROW,
+                0,
+                1,
+                0,
+                fileName,
+                Collections.emptyList(),
+                null,
+                EMPTY_ROW,
+                EMPTY_ROW,
+                null,
+                rowCount,
+                firstRowId);
+    }
+
     private ConflictDetection createConflictDetection() {
         return new ConflictDetection(
                 "test-table",
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py 
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index f48f4c99f3..13555a4231 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -762,7 +762,7 @@ class JavaPyReadWriteTest(unittest.TestCase):
         tc = wb.new_commit()
         with self.assertRaises(RuntimeError) as ctx:
             tc.commit(stale_commit_msgs)
-        self.assertIn("conflicts", str(ctx.exception))
+        self.assertIn("conflict", str(ctx.exception))
         tc.close()
         print(f"Conflict detected as expected: {ctx.exception}")
 
diff --git a/paimon-python/pypaimon/tests/write/conflict_detection_test.py 
b/paimon-python/pypaimon/tests/write/conflict_detection_test.py
new file mode 100644
index 0000000000..302a7d801f
--- /dev/null
+++ b/paimon-python/pypaimon/tests/write/conflict_detection_test.py
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from dataclasses import dataclass
+from typing import List
+
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.write.commit.conflict_detection import (
+    ConflictDetection,
+    RowIdColumnConflictChecker,
+)
+
+
+def _make_file(file_name, row_count=100, first_row_id=None,
+               schema_id=0, write_cols=None):
+    return DataFileMeta(
+        file_name=file_name,
+        file_size=1024,
+        row_count=row_count,
+        min_key=None,
+        max_key=None,
+        key_stats=None,
+        value_stats=None,
+        min_sequence_number=0,
+        max_sequence_number=0,
+        schema_id=schema_id,
+        level=0,
+        extra_files=[],
+        first_row_id=first_row_id,
+        write_cols=write_cols,
+    )
+
+
+_EMPTY_PARTITION = GenericRow([], [])
+
+
+def _make_entry(file_name, kind=0, bucket=0, first_row_id=None,
+                row_count=100, write_cols=None, schema_id=0):
+    return ManifestEntry(
+        kind=kind,
+        partition=_EMPTY_PARTITION,
+        bucket=bucket,
+        total_buckets=1,
+        file=_make_file(file_name, row_count=row_count,
+                        first_row_id=first_row_id, schema_id=schema_id,
+                        write_cols=write_cols),
+    )
+
+
+@dataclass
+class _FakeSchema:
+    id: int
+    fields: List[DataField]
+
+
+class _FakeSchemaManager:
+
+    def __init__(self, schemas=None):
+        self._schemas = {}
+        if schemas:
+            for s in schemas:
+                self._schemas[s.id] = s
+
+    def get_schema(self, schema_id):
+        return self._schemas.get(schema_id)
+
+
+_DEFAULT_SCHEMA = _FakeSchema(
+    id=0,
+    fields=[
+        DataField(1, "col_a", AtomicType("INT")),
+        DataField(2, "col_b", AtomicType("STRING")),
+        DataField(3, "col_c", AtomicType("BIGINT")),
+    ],
+)
+
+
+class TestCheckRowIdExistence(unittest.TestCase):
+
+    def _make_detection(self):
+        return ConflictDetection(
+            data_evolution_enabled=True,
+            snapshot_manager=None,
+            manifest_list_manager=None,
+            table=None,
+            commit_scanner=None,
+        )
+
+    def test_no_conflict_when_base_file_exists(self):
+        detection = self._make_detection()
+        base = [_make_entry("f1", kind=0, first_row_id=0, row_count=100)]
+        delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)]
+        self.assertIsNone(
+            detection.check_row_id_existence(base, delta, next_row_id=200))
+
+    def test_conflict_when_base_file_removed(self):
+        detection = self._make_detection()
+        base = []
+        delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)]
+        result = detection.check_row_id_existence(base, delta, next_row_id=200)
+        self.assertIsNotNone(result)
+        self.assertIn("Row ID existence conflict", str(result))
+
+    def test_conflict_when_base_file_rewritten(self):
+        detection = self._make_detection()
+        base = [_make_entry("f2", kind=0, first_row_id=0, row_count=200)]
+        delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)]
+        result = detection.check_row_id_existence(base, delta, next_row_id=200)
+        self.assertIsNotNone(result)
+        self.assertIn("Row ID existence conflict", str(result))
+
+    def test_skip_newly_appended_files(self):
+        detection = self._make_detection()
+        base = []
+        delta = [_make_entry("p1", kind=0, first_row_id=200, row_count=100)]
+        self.assertIsNone(
+            detection.check_row_id_existence(base, delta, next_row_id=200))
+
+    def test_skip_when_no_pre_assigned_row_id(self):
+        detection = self._make_detection()
+        base = []
+        delta = [_make_entry("f1", kind=0)]
+        self.assertIsNone(
+            detection.check_row_id_existence(base, delta, next_row_id=200))
+
+    def test_skip_delete_entries(self):
+        detection = self._make_detection()
+        base = []
+        delta = [_make_entry("f1", kind=1, first_row_id=0, row_count=100)]
+        self.assertIsNone(
+            detection.check_row_id_existence(base, delta, next_row_id=200))
+
+    def test_skip_when_data_evolution_disabled(self):
+        detection = ConflictDetection(
+            data_evolution_enabled=False,
+            snapshot_manager=None,
+            manifest_list_manager=None,
+            table=None,
+            commit_scanner=None,
+        )
+        base = []
+        delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)]
+        self.assertIsNone(
+            detection.check_row_id_existence(base, delta, next_row_id=200))
+
+    def test_skip_when_next_row_id_is_none(self):
+        detection = self._make_detection()
+        base = []
+        delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)]
+        self.assertIsNone(
+            detection.check_row_id_existence(base, delta, next_row_id=None))
+
+
+class TestRowIdColumnConflictChecker(unittest.TestCase):
+
+    def _make_checker(self, delta_files, schema=None):
+        schema_mgr = _FakeSchemaManager([schema or _DEFAULT_SCHEMA])
+        return RowIdColumnConflictChecker.from_data_files(schema_mgr, 
delta_files)
+
+    def test_no_conflict_disjoint_rows(self):
+        delta_files = [
+            _make_file("d1", row_count=100, first_row_id=0, 
write_cols=["col_a"]),
+        ]
+        checker = self._make_checker(delta_files)
+        committed = _make_file("c1", row_count=100, first_row_id=200,
+                               write_cols=["col_a"])
+        self.assertFalse(checker.conflicts_with(committed))
+
+    def test_no_conflict_same_rows_different_columns(self):
+        delta_files = [
+            _make_file("d1", row_count=100, first_row_id=0, 
write_cols=["col_a"]),
+        ]
+        checker = self._make_checker(delta_files)
+        committed = _make_file("c1", row_count=100, first_row_id=0,
+                               write_cols=["col_b"])
+        self.assertFalse(checker.conflicts_with(committed))
+
+    def test_conflict_same_rows_same_columns(self):
+        delta_files = [
+            _make_file("d1", row_count=100, first_row_id=0, 
write_cols=["col_a"]),
+        ]
+        checker = self._make_checker(delta_files)
+        committed = _make_file("c1", row_count=100, first_row_id=0,
+                               write_cols=["col_a"])
+        self.assertTrue(checker.conflicts_with(committed))
+
+    def test_conflict_overlapping_rows_overlapping_columns(self):
+        delta_files = [
+            _make_file("d1", row_count=100, first_row_id=0,
+                       write_cols=["col_a", "col_b"]),
+        ]
+        checker = self._make_checker(delta_files)
+        committed = _make_file("c1", row_count=100, first_row_id=50,
+                               write_cols=["col_b", "col_c"])
+        self.assertTrue(checker.conflicts_with(committed))
+
+    def test_conflict_null_write_cols_committed(self):
+        """null write_cols means full-schema write — always conflicts on 
column dimension."""
+        delta_files = [
+            _make_file("d1", row_count=100, first_row_id=0, 
write_cols=["col_a"]),
+        ]
+        checker = self._make_checker(delta_files)
+        committed = _make_file("c1", row_count=100, first_row_id=0,
+                               write_cols=None)
+        self.assertTrue(checker.conflicts_with(committed))
+
+    def test_conflict_null_write_cols_delta(self):
+        """null write_cols in delta means all columns are in the write 
range."""
+        delta_files = [
+            _make_file("d1", row_count=100, first_row_id=0, write_cols=None),
+        ]
+        checker = self._make_checker(delta_files)
+        committed = _make_file("c1", row_count=100, first_row_id=0,
+                               write_cols=["col_b"])
+        self.assertTrue(checker.conflicts_with(committed))
+
+    def test_no_conflict_committed_file_no_row_id(self):
+        delta_files = [
+            _make_file("d1", row_count=100, first_row_id=0, 
write_cols=["col_a"]),
+        ]
+        checker = self._make_checker(delta_files)
+        committed = _make_file("c1", row_count=100, first_row_id=None,
+                               write_cols=["col_a"])
+        self.assertFalse(checker.conflicts_with(committed))
+
+    def test_none_when_no_delta_files_with_row_id(self):
+        delta_files = [
+            _make_file("d1", row_count=100, first_row_id=None),
+        ]
+        schema_mgr = _FakeSchemaManager([_DEFAULT_SCHEMA])
+        checker = RowIdColumnConflictChecker.from_data_files(schema_mgr, 
delta_files)
+        self.assertIsNone(checker)
+
+    def test_system_fields_skipped(self):
+        """System fields like _ROW_ID should not count as column conflicts."""
+        delta_files = [
+            _make_file("d1", row_count=100, first_row_id=0,
+                       write_cols=["_ROW_ID", "col_a"]),
+        ]
+        checker = self._make_checker(delta_files)
+        committed = _make_file("c1", row_count=100, first_row_id=0,
+                               write_cols=["_ROW_ID", "col_b"])
+        self.assertFalse(checker.conflicts_with(committed))
+
+    def test_cross_schema_field_id_resolution(self):
+        """Fields with same ID but different names across schema versions 
should still match."""
+        schema_v0 = _FakeSchema(
+            id=0,
+            fields=[
+                DataField(1, "col_a", AtomicType("INT")),
+                DataField(2, "col_b", AtomicType("STRING")),
+            ],
+        )
+        schema_v1 = _FakeSchema(
+            id=1,
+            fields=[
+                DataField(1, "col_a_renamed", AtomicType("INT")),
+                DataField(2, "col_b", AtomicType("STRING")),
+                DataField(3, "col_c", AtomicType("BIGINT")),
+            ],
+        )
+        schema_mgr = _FakeSchemaManager([schema_v0, schema_v1])
+        delta_files = [
+            _make_file("d1", row_count=100, first_row_id=0,
+                       schema_id=0, write_cols=["col_a"]),
+        ]
+        checker = RowIdColumnConflictChecker.from_data_files(schema_mgr, 
delta_files)
+        committed_same_field = _make_file(
+            "c1", row_count=100, first_row_id=0,
+            schema_id=1, write_cols=["col_a_renamed"])
+        self.assertTrue(checker.conflicts_with(committed_same_field))
+        committed_diff_field = _make_file(
+            "c2", row_count=100, first_row_id=0,
+            schema_id=1, write_cols=["col_c"])
+        self.assertFalse(checker.conflicts_with(committed_diff_field))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/write/commit/conflict_detection.py 
b/paimon-python/pypaimon/write/commit/conflict_detection.py
index 6b7652c00b..fc92dc838b 100644
--- a/paimon-python/pypaimon/write/commit/conflict_detection.py
+++ b/paimon-python/pypaimon/write/commit/conflict_detection.py
@@ -19,31 +19,140 @@
 Conflict detection for commit operations.
 """
 
+import bisect
+
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.file_entry import FileEntry
+from pypaimon.table.special_fields import SpecialFields
 from pypaimon.utils.range import Range
 from pypaimon.utils.range_helper import RangeHelper
 from pypaimon.write.commit.commit_scanner import CommitScanner
 
 
-class ConflictDetection:
-    """Detects conflicts between base and delta files during commit.
+class RowIdColumnConflictChecker:
+    """Checks for row ID × column conflicts between delta files and committed 
files.
 
-    This class provides row ID range conflict checks and row ID from snapshot 
conflict checks
-    for Data Evolution tables.
+    Built from the current commit's delta files. For each committed file,
+    checks whether it overlaps with the delta files on BOTH dimensions:
+    row-id range AND write columns.
     """
 
+    def __init__(self, write_ranges, schema_manager):
+        self._write_ranges = write_ranges
+        self._schema_manager = schema_manager
+        self._field_id_cache = {}
+
+    @classmethod
+    def from_data_files(cls, schema_manager, delta_files):
+        files_with_row_id = [f for f in delta_files if f.first_row_id is not 
None]
+        if not files_with_row_id:
+            return None
+
+        range_helper = RangeHelper(lambda f: f.row_id_range())
+        groups = range_helper.merge_overlapping_ranges(files_with_row_id)
+
+        write_ranges = []
+        for group in groups:
+            merged_from = min(f.first_row_id for f in group)
+            merged_to = max(f.first_row_id + f.row_count - 1 for f in group)
+            merged_range = Range(merged_from, merged_to)
+
+            field_ids = set()
+            for f in group:
+                cls._add_write_field_ids(field_ids, f, schema_manager)
+
+            write_ranges.append(_WriteRange(merged_range, field_ids))
+
+        write_ranges.sort(key=lambda wr: (wr.range.from_, wr.range.to))
+        return cls(write_ranges, schema_manager)
+
+    def is_empty(self):
+        return len(self._write_ranges) == 0
+
+    def conflicts_with(self, file):
+        if file.first_row_id is None:
+            return False
+
+        file_range = Range(file.first_row_id, file.first_row_id + 
file.row_count - 1)
+        index = self._first_possible_range(file_range)
+
+        while index < len(self._write_ranges):
+            wr = self._write_ranges[index]
+            if wr.range.from_ > file_range.to:
+                return False
+            if wr.range.overlaps(file_range) and 
self._contains_any_write_field(wr.field_ids, file):
+                return True
+            index += 1
+
+        return False
+
+    def _first_possible_range(self, target):
+        keys = [wr.range.to for wr in self._write_ranges]
+        return bisect.bisect_left(keys, target.from_)
+
+    def _contains_any_write_field(self, field_ids, file):
+        if file.write_cols is None:
+            return True
+        for col_name in file.write_cols:
+            fid = self._field_id(file, col_name)
+            if fid is not None and fid in field_ids:
+                return True
+        return False
+
+    def _field_id(self, file, col_name):
+        if SpecialFields.is_system_field(col_name):
+            return None
+        name_to_id = self._field_id_by_name(file.schema_id)
+        fid = name_to_id.get(col_name)
+        if fid is None:
+            raise RuntimeError(
+                f"Column '{col_name}' not found in schema {file.schema_id}")
+        return fid
+
+    def _field_id_by_name(self, schema_id):
+        if schema_id not in self._field_id_cache:
+            schema = self._schema_manager.get_schema(schema_id)
+            if schema is None:
+                raise RuntimeError(f"Schema {schema_id} not found")
+            self._field_id_cache[schema_id] = {
+                field.name: field.id for field in schema.fields
+            }
+        return self._field_id_cache[schema_id]
+
+    @classmethod
+    def _add_write_field_ids(cls, field_ids, file, schema_manager):
+        if file.write_cols is None:
+            schema = schema_manager.get_schema(file.schema_id)
+            if schema is not None:
+                for field in schema.fields:
+                    if not SpecialFields.is_system_field(field.name):
+                        field_ids.add(field.id)
+        else:
+            name_to_id = {}
+            schema = schema_manager.get_schema(file.schema_id)
+            if schema is not None:
+                name_to_id = {field.name: field.id for field in schema.fields}
+            for col_name in file.write_cols:
+                if SpecialFields.is_system_field(col_name):
+                    continue
+                fid = name_to_id.get(col_name)
+                if fid is not None:
+                    field_ids.add(fid)
+
+
+class _WriteRange:
+
+    def __init__(self, range_, field_ids):
+        self.range = range_
+        self.field_ids = field_ids
+
+
+class ConflictDetection:
+    """Detects conflicts between base and delta files during commit."""
+
     def __init__(self, data_evolution_enabled, snapshot_manager,
                  manifest_list_manager: ManifestListManager, table, 
commit_scanner: CommitScanner):
-        """Initialize ConflictDetection.
-
-        Args:
-            data_evolution_enabled: Whether data evolution feature is enabled.
-            snapshot_manager: Manager for reading snapshot metadata.
-            manifest_list_manager: Manager for reading manifest lists.
-            table: The FileStoreTable instance.
-        """
         self.data_evolution_enabled = data_evolution_enabled
         self.snapshot_manager = snapshot_manager
         self.manifest_list_manager = manifest_list_manager
@@ -58,20 +167,6 @@ class ConflictDetection:
         return self._row_id_check_from_snapshot is not None
 
     def check_conflicts(self, latest_snapshot, base_entries, delta_entries, 
commit_kind):
-        """Run all conflict checks and return the first detected conflict.
-
-        merges base_entries and delta_entries, then runs conflict checks
-        on the merged result.
-
-        Args:
-            latest_snapshot: The latest snapshot at commit time.
-            base_entries: All entries read from the latest snapshot.
-            delta_entries: The delta entries being committed.
-            commit_kind: The kind of commit (e.g. "APPEND", "COMPACT", 
"OVERWRITE").
-
-        Returns:
-            A RuntimeError if a conflict is detected, otherwise None.
-        """
         all_entries = list(base_entries) + list(delta_entries)
 
         try:
@@ -80,25 +175,61 @@ class ConflictDetection:
             return RuntimeError(
                 "File deletion conflicts detected! Give up committing. " + 
str(e))
 
+        if commit_kind != "COMPACT":
+            next_row_id = latest_snapshot.next_row_id if latest_snapshot else 
None
+            conflict = self.check_row_id_existence(
+                base_entries, delta_entries, next_row_id)
+            if conflict is not None:
+                return conflict
+
         conflict = self.check_row_id_range_conflicts(commit_kind, 
merged_entries)
         if conflict is not None:
             return conflict
 
         return self.check_row_id_from_snapshot(latest_snapshot, delta_entries)
 
-    def check_row_id_range_conflicts(self, commit_kind, commit_entries):
-        """Check for row ID range conflicts among merged entries.
+    def check_row_id_existence(self, base_entries, delta_entries, 
next_row_id=None):
+        if not self.data_evolution_enabled:
+            return None
+
+        if next_row_id is None:
+            return None
+
+        files_to_check = [
+            entry for entry in delta_entries
+            if entry.kind == 0
+            and entry.file.first_row_id is not None
+            and entry.file.first_row_id < next_row_id
+        ]
 
-        only enabled when data evolution is active, and checks that
-        overlapping row ID ranges in non-blob data files are identical.
+        if not files_to_check:
+            return None
+
+        existing_index = set()
+        for base in base_entries:
+            if base.file.first_row_id is not None:
+                existing_index.add((
+                    base.partition, base.bucket,
+                    base.file.first_row_id, base.file.row_count))
+
+        for entry in files_to_check:
+            key = (entry.partition, entry.bucket,
+                   entry.file.first_row_id, entry.file.row_count)
+            if key not in existing_index:
+                return RuntimeError(
+                    "Row ID existence conflict: file '{}' references "
+                    "firstRowId={}, rowCount={} in bucket {}, "
+                    "but no matching file exists in the current snapshot. "
+                    "The referenced file may have been rewritten by a "
+                    "concurrent compaction or removed by an overwrite.".format(
+                        entry.file.file_name,
+                        entry.file.first_row_id,
+                        entry.file.row_count,
+                        entry.bucket))
 
-        Args:
-            commit_kind: The kind of commit (e.g. "APPEND", "COMPACT").
-            commit_entries: The entries being committed.
+        return None
 
-        Returns:
-            A RuntimeError if conflict is detected, otherwise None.
-        """
+    def check_row_id_range_conflicts(self, commit_kind, commit_entries):
         if not self.data_evolution_enabled:
             return None
         if self._row_id_check_from_snapshot is None and commit_kind != 
"COMPACT":
@@ -137,31 +268,16 @@ class ConflictDetection:
         return None
 
     def check_row_id_from_snapshot(self, latest_snapshot, commit_entries):
-        """Check for row ID conflicts from a specific snapshot onwards.
-
-        collects row ID ranges from delta entries, then checks if any
-        incremental changes between the check snapshot and latest snapshot
-        have overlapping row ID ranges.
-
-        Args:
-            latest_snapshot: The latest snapshot at commit time.
-            commit_entries: The delta entries being committed.
-
-        Returns:
-            A RuntimeError if conflict is detected, otherwise None.
-        """
         if not self.data_evolution_enabled:
             return None
         if self._row_id_check_from_snapshot is None:
             return None
 
-        history_id_ranges = []
-        for entry in commit_entries:
-            first_row_id = entry.file.first_row_id
-            row_count = entry.file.row_count
-            if first_row_id is not None:
-                history_id_ranges.append(
-                    Range(first_row_id, first_row_id + row_count - 1))
+        delta_files = [entry.file for entry in commit_entries]
+        column_checker = RowIdColumnConflictChecker.from_data_files(
+            self.table.schema_manager, delta_files)
+        if column_checker is None or column_checker.is_empty():
+            return None
 
         check_snapshot = self.snapshot_manager.get_snapshot_by_id(
             self._row_id_check_from_snapshot)
@@ -187,12 +303,11 @@ class ConflictDetection:
                 if file_range is None:
                     continue
                 if file_range.from_ < check_next_row_id:
-                    for history_range in history_id_ranges:
-                        if history_range.overlaps(file_range):
-                            return RuntimeError(
-                                "For Data Evolution table, multiple 'MERGE 
INTO' "
-                                "operations have encountered conflicts, 
updating "
-                                "the same file, which can render some updates "
-                                "ineffective.")
+                    if column_checker.conflicts_with(entry.file):
+                        return RuntimeError(
+                            "For Data Evolution table, multiple 'MERGE INTO' "
+                            "operations have encountered conflicts, updating "
+                            "the same file, which can render some updates "
+                            "ineffective.")
 
         return None
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 486e289240..cfb4090e77 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -330,21 +330,6 @@ class FileStoreCommit:
         # process snapshot
         new_snapshot_id = latest_snapshot.id + 1 if latest_snapshot else 1
 
-        # Check if row tracking is enabled
-        row_tracking_enabled = self.table.options.row_tracking_enabled()
-
-        # Apply row tracking logic if enabled
-        next_row_id = None
-        if row_tracking_enabled:
-            # Assign snapshot ID to delta files
-            commit_entries = self._assign_snapshot_id(new_snapshot_id, 
commit_entries)
-
-            # Get the next row ID start from the latest snapshot
-            first_row_id_start = self._get_next_row_id_start(latest_snapshot)
-
-            # Assign row IDs to new files and get the next row ID for the 
snapshot
-            commit_entries, next_row_id = 
self._assign_row_tracking_meta(first_row_id_start, commit_entries)
-
         # Conflict detection: read base entries from latest snapshot, then 
check conflicts
         if detect_conflicts and latest_snapshot is not None:
             base_entries = 
self.commit_scanner.read_all_entries_from_changed_partitions(
@@ -358,6 +343,14 @@ class FileStoreCommit:
                         return RetryResult(latest_snapshot, conflict_exception)
                 raise conflict_exception
 
+        # Apply row tracking logic after conflict detection (matches Java 
ordering)
+        row_tracking_enabled = self.table.options.row_tracking_enabled()
+        next_row_id = None
+        if row_tracking_enabled:
+            commit_entries = self._assign_snapshot_id(new_snapshot_id, 
commit_entries)
+            first_row_id_start = self._get_next_row_id_start(latest_snapshot)
+            commit_entries, next_row_id = 
self._assign_row_tracking_meta(first_row_id_start, commit_entries)
+
         try:
             new_manifest_file_meta = self._write_manifest_file(commit_entries, 
new_manifest_file)
             self.manifest_list_manager.write(delta_manifest_list, 
[new_manifest_file_meta])
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index ff54322b92..644eb49847 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -201,7 +201,10 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
             success = true
           } catch {
             case e: Exception =>
-              if (!e.getMessage.contains("multiple 'MERGE INTO' and 'COMPACT' 
operations")) {
+              if (
+                !e.getMessage.contains("multiple 'MERGE INTO' and 'COMPACT' 
operations")
+                && !e.getMessage.contains("Row ID existence conflict")
+              ) {
                 throw e
               }
           }


Reply via email to