This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 73c962ab9f [core][python] Add row ID existence check and column-level
conflict detection (#7971)
73c962ab9f is described below
commit 73c962ab9ff7e72bffe0be1e548c7047a3458564
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 26 16:23:52 2026 +0800
[core][python] Add row ID existence check and column-level conflict
detection (#7971)
Add a Rust-style row ID existence check to both Java and Python commit
validation, ensuring pre-assigned firstRowId files reference base files
that still exist in the current snapshot. This covers a gap where
checkRowIdRangeConflicts misses conflicts when base files are fully
deleted (e.g., DROP PARTITION / OVERWRITE) with no replacement.
Also port Java's RowIdColumnConflictChecker to Python, enabling
row×column two-dimensional conflict detection in
check_row_id_from_snapshot. This allows concurrent MERGE INTO on
different columns of the same rows to proceed without false conflicts.
---
.../paimon/operation/commit/ConflictDetection.java | 101 +++++++
.../operation/commit/ConflictDetectionTest.java | 115 ++++++++
.../pypaimon/tests/e2e/java_py_read_write_test.py | 2 +-
.../tests/write/conflict_detection_test.py | 297 +++++++++++++++++++++
.../pypaimon/write/commit/conflict_detection.py | 241 ++++++++++++-----
paimon-python/pypaimon/write/file_store_commit.py | 23 +-
.../paimon/spark/sql/RowTrackingTestBase.scala | 5 +-
7 files changed, 704 insertions(+), 80 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 29ee045818..493d13d88d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -224,6 +224,14 @@ public class ConflictDetection {
return exception;
}
+ if (commitKind != CommitKind.COMPACT) {
+ Long nextRowId = latestSnapshot.nextRowId();
+ exception = checkRowIdExistence(baseEntries, deltaEntries,
nextRowId);
+ if (exception.isPresent()) {
+ return exception;
+ }
+ }
+
exception = checkRowIdRangeConflicts(commitKind, mergedEntries);
if (exception.isPresent()) {
return exception;
@@ -536,6 +544,99 @@ public class ConflictDetection {
return Optional.empty();
}
+ Optional<RuntimeException> checkRowIdExistence(
+ List<SimpleFileEntry> baseEntries,
+ List<SimpleFileEntry> deltaEntries,
+ @Nullable Long nextRowId) {
+ if (!dataEvolutionEnabled) {
+ return Optional.empty();
+ }
+
+ List<SimpleFileEntry> filesToCheck =
+ deltaEntries.stream()
+ .filter(
+ e ->
+ e.kind() == FileKind.ADD
+ && e.firstRowId() != null
+ && nextRowId != null
+ && e.firstRowId() < nextRowId)
+ .collect(Collectors.toList());
+
+ if (filesToCheck.isEmpty()) {
+ return Optional.empty();
+ }
+
+ Set<FileRowIdKey> existingIndex = new HashSet<>();
+ for (SimpleFileEntry base : baseEntries) {
+ if (base.firstRowId() != null) {
+ existingIndex.add(
+ new FileRowIdKey(
+ base.partition(),
+ base.bucket(),
+ base.firstRowId(),
+ base.rowCount()));
+ }
+ }
+
+ for (SimpleFileEntry entry : filesToCheck) {
+ FileRowIdKey key =
+ new FileRowIdKey(
+ entry.partition(),
+ entry.bucket(),
+ entry.firstRowId(),
+ entry.rowCount());
+ if (!existingIndex.contains(key)) {
+ return Optional.of(
+ new RuntimeException(
+ String.format(
+ "Row ID existence conflict: file '%s'
references "
+ + "firstRowId=%d, rowCount=%d
in bucket %d, "
+ + "but no matching file exists
in the current snapshot. "
+ + "The referenced file may
have been rewritten by a "
+ + "concurrent compaction or
removed by an overwrite.",
+ entry.fileName(),
+ entry.firstRowId(),
+ entry.rowCount(),
+ entry.bucket())));
+ }
+ }
+ return Optional.empty();
+ }
+
+ private static class FileRowIdKey {
+ private final BinaryRow partition;
+ private final int bucket;
+ private final long firstRowId;
+ private final long rowCount;
+
+ FileRowIdKey(BinaryRow partition, int bucket, long firstRowId, long
rowCount) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.firstRowId = firstRowId;
+ this.rowCount = rowCount;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FileRowIdKey that = (FileRowIdKey) o;
+ return bucket == that.bucket
+ && firstRowId == that.firstRowId
+ && rowCount == that.rowCount
+ && Objects.equals(partition, that.partition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partition, bucket, firstRowId, rowCount);
+ }
+ }
+
private static boolean dedicatedStorageFile(String fileName) {
return isBlobFile(fileName) || isVectorStoreFile(fileName);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
index c3e0258da2..1c36b9e09e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
@@ -38,6 +38,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Optional;
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
@@ -374,6 +375,120 @@ class ConflictDetectionTest {
.isFalse();
}
+ @Test
+ void testCheckRowIdExistenceNoConflict() {
+ ConflictDetection detection = createConflictDetection();
+
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
+ baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L));
+
+ List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+ deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));
+
+ assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries,
100L)).isEmpty();
+ }
+
+ @Test
+ void testCheckRowIdExistenceBaseFileRemoved() {
+ ConflictDetection detection = createConflictDetection();
+
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
+
+ List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+ deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));
+
+ Optional<RuntimeException> result =
+ detection.checkRowIdExistence(baseEntries, deltaEntries, 100L);
+ assertThat(result).isPresent();
+ assertThat(result.get().getMessage()).contains("Row ID existence
conflict");
+ }
+
+ @Test
+ void testCheckRowIdExistenceBaseFileRewritten() {
+ ConflictDetection detection = createConflictDetection();
+
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
+ baseEntries.add(createFileEntryWithRowId("f2", ADD, 0L, 200L));
+
+ List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+ deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));
+
+ Optional<RuntimeException> result =
+ detection.checkRowIdExistence(baseEntries, deltaEntries, 200L);
+ assertThat(result).isPresent();
+ assertThat(result.get().getMessage()).contains("Row ID existence
conflict");
+ }
+
+ @Test
+ void testCheckRowIdExistenceSkipsNewlyAppendedFiles() {
+ ConflictDetection detection = createConflictDetection();
+
+ // nextRowId=100: files with firstRowId >= 100 are newly appended, not
references
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
+ baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L));
+
+ List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+ // partial-column update referencing existing rows (firstRowId=0 <
nextRowId=100)
+ deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));
+ // newly appended file (firstRowId=100 >= nextRowId=100), should be
skipped
+ deltaEntries.add(createFileEntryWithRowId("new1", ADD, 100L, 50L));
+
+ assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries,
100L)).isEmpty();
+ }
+
+ @Test
+ void testCheckRowIdExistenceSkipsNonPreAssigned() {
+ ConflictDetection detection = createConflictDetection();
+
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
+
+ List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+ deltaEntries.add(createFileEntry("f1", ADD));
+
+ assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries,
100L)).isEmpty();
+ }
+
+ @Test
+ void testCheckRowIdExistenceSkipsDeleteEntries() {
+ ConflictDetection detection = createConflictDetection();
+
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
+
+ List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+ deltaEntries.add(createFileEntryWithRowId("f1", DELETE, 0L, 100L));
+
+ assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries,
100L)).isEmpty();
+ }
+
+ @Test
+ void testCheckRowIdExistenceSkipsWhenNextRowIdNull() {
+ ConflictDetection detection = createConflictDetection();
+
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
+ List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+ deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));
+
+ assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries,
null)).isEmpty();
+ }
+
+ private SimpleFileEntry createFileEntryWithRowId(
+ String fileName, FileKind kind, long firstRowId, long rowCount) {
+ return new SimpleFileEntry(
+ kind,
+ EMPTY_ROW,
+ 0,
+ 1,
+ 0,
+ fileName,
+ Collections.emptyList(),
+ null,
+ EMPTY_ROW,
+ EMPTY_ROW,
+ null,
+ rowCount,
+ firstRowId);
+ }
+
private ConflictDetection createConflictDetection() {
return new ConflictDetection(
"test-table",
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index f48f4c99f3..13555a4231 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -762,7 +762,7 @@ class JavaPyReadWriteTest(unittest.TestCase):
tc = wb.new_commit()
with self.assertRaises(RuntimeError) as ctx:
tc.commit(stale_commit_msgs)
- self.assertIn("conflicts", str(ctx.exception))
+ self.assertIn("conflict", str(ctx.exception))
tc.close()
print(f"Conflict detected as expected: {ctx.exception}")
diff --git a/paimon-python/pypaimon/tests/write/conflict_detection_test.py
b/paimon-python/pypaimon/tests/write/conflict_detection_test.py
new file mode 100644
index 0000000000..302a7d801f
--- /dev/null
+++ b/paimon-python/pypaimon/tests/write/conflict_detection_test.py
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from dataclasses import dataclass
+from typing import List
+
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.write.commit.conflict_detection import (
+ ConflictDetection,
+ RowIdColumnConflictChecker,
+)
+
+
+def _make_file(file_name, row_count=100, first_row_id=None,
+ schema_id=0, write_cols=None):
+ return DataFileMeta(
+ file_name=file_name,
+ file_size=1024,
+ row_count=row_count,
+ min_key=None,
+ max_key=None,
+ key_stats=None,
+ value_stats=None,
+ min_sequence_number=0,
+ max_sequence_number=0,
+ schema_id=schema_id,
+ level=0,
+ extra_files=[],
+ first_row_id=first_row_id,
+ write_cols=write_cols,
+ )
+
+
+_EMPTY_PARTITION = GenericRow([], [])
+
+
+def _make_entry(file_name, kind=0, bucket=0, first_row_id=None,
+ row_count=100, write_cols=None, schema_id=0):
+ return ManifestEntry(
+ kind=kind,
+ partition=_EMPTY_PARTITION,
+ bucket=bucket,
+ total_buckets=1,
+ file=_make_file(file_name, row_count=row_count,
+ first_row_id=first_row_id, schema_id=schema_id,
+ write_cols=write_cols),
+ )
+
+
+@dataclass
+class _FakeSchema:
+ id: int
+ fields: List[DataField]
+
+
+class _FakeSchemaManager:
+
+ def __init__(self, schemas=None):
+ self._schemas = {}
+ if schemas:
+ for s in schemas:
+ self._schemas[s.id] = s
+
+ def get_schema(self, schema_id):
+ return self._schemas.get(schema_id)
+
+
+_DEFAULT_SCHEMA = _FakeSchema(
+ id=0,
+ fields=[
+ DataField(1, "col_a", AtomicType("INT")),
+ DataField(2, "col_b", AtomicType("STRING")),
+ DataField(3, "col_c", AtomicType("BIGINT")),
+ ],
+)
+
+
+class TestCheckRowIdExistence(unittest.TestCase):
+
+ def _make_detection(self):
+ return ConflictDetection(
+ data_evolution_enabled=True,
+ snapshot_manager=None,
+ manifest_list_manager=None,
+ table=None,
+ commit_scanner=None,
+ )
+
+ def test_no_conflict_when_base_file_exists(self):
+ detection = self._make_detection()
+ base = [_make_entry("f1", kind=0, first_row_id=0, row_count=100)]
+ delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)]
+ self.assertIsNone(
+ detection.check_row_id_existence(base, delta, next_row_id=200))
+
+ def test_conflict_when_base_file_removed(self):
+ detection = self._make_detection()
+ base = []
+ delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)]
+ result = detection.check_row_id_existence(base, delta, next_row_id=200)
+ self.assertIsNotNone(result)
+ self.assertIn("Row ID existence conflict", str(result))
+
+ def test_conflict_when_base_file_rewritten(self):
+ detection = self._make_detection()
+ base = [_make_entry("f2", kind=0, first_row_id=0, row_count=200)]
+ delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)]
+ result = detection.check_row_id_existence(base, delta, next_row_id=200)
+ self.assertIsNotNone(result)
+ self.assertIn("Row ID existence conflict", str(result))
+
+ def test_skip_newly_appended_files(self):
+ detection = self._make_detection()
+ base = []
+ delta = [_make_entry("p1", kind=0, first_row_id=200, row_count=100)]
+ self.assertIsNone(
+ detection.check_row_id_existence(base, delta, next_row_id=200))
+
+ def test_skip_when_no_pre_assigned_row_id(self):
+ detection = self._make_detection()
+ base = []
+ delta = [_make_entry("f1", kind=0)]
+ self.assertIsNone(
+ detection.check_row_id_existence(base, delta, next_row_id=200))
+
+ def test_skip_delete_entries(self):
+ detection = self._make_detection()
+ base = []
+ delta = [_make_entry("f1", kind=1, first_row_id=0, row_count=100)]
+ self.assertIsNone(
+ detection.check_row_id_existence(base, delta, next_row_id=200))
+
+ def test_skip_when_data_evolution_disabled(self):
+ detection = ConflictDetection(
+ data_evolution_enabled=False,
+ snapshot_manager=None,
+ manifest_list_manager=None,
+ table=None,
+ commit_scanner=None,
+ )
+ base = []
+ delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)]
+ self.assertIsNone(
+ detection.check_row_id_existence(base, delta, next_row_id=200))
+
+ def test_skip_when_next_row_id_is_none(self):
+ detection = self._make_detection()
+ base = []
+ delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)]
+ self.assertIsNone(
+ detection.check_row_id_existence(base, delta, next_row_id=None))
+
+
+class TestRowIdColumnConflictChecker(unittest.TestCase):
+
+ def _make_checker(self, delta_files, schema=None):
+ schema_mgr = _FakeSchemaManager([schema or _DEFAULT_SCHEMA])
+ return RowIdColumnConflictChecker.from_data_files(schema_mgr,
delta_files)
+
+ def test_no_conflict_disjoint_rows(self):
+ delta_files = [
+ _make_file("d1", row_count=100, first_row_id=0,
write_cols=["col_a"]),
+ ]
+ checker = self._make_checker(delta_files)
+ committed = _make_file("c1", row_count=100, first_row_id=200,
+ write_cols=["col_a"])
+ self.assertFalse(checker.conflicts_with(committed))
+
+ def test_no_conflict_same_rows_different_columns(self):
+ delta_files = [
+ _make_file("d1", row_count=100, first_row_id=0,
write_cols=["col_a"]),
+ ]
+ checker = self._make_checker(delta_files)
+ committed = _make_file("c1", row_count=100, first_row_id=0,
+ write_cols=["col_b"])
+ self.assertFalse(checker.conflicts_with(committed))
+
+ def test_conflict_same_rows_same_columns(self):
+ delta_files = [
+ _make_file("d1", row_count=100, first_row_id=0,
write_cols=["col_a"]),
+ ]
+ checker = self._make_checker(delta_files)
+ committed = _make_file("c1", row_count=100, first_row_id=0,
+ write_cols=["col_a"])
+ self.assertTrue(checker.conflicts_with(committed))
+
+ def test_conflict_overlapping_rows_overlapping_columns(self):
+ delta_files = [
+ _make_file("d1", row_count=100, first_row_id=0,
+ write_cols=["col_a", "col_b"]),
+ ]
+ checker = self._make_checker(delta_files)
+ committed = _make_file("c1", row_count=100, first_row_id=50,
+ write_cols=["col_b", "col_c"])
+ self.assertTrue(checker.conflicts_with(committed))
+
+ def test_conflict_null_write_cols_committed(self):
+ """null write_cols means full-schema write — always conflicts on
column dimension."""
+ delta_files = [
+ _make_file("d1", row_count=100, first_row_id=0,
write_cols=["col_a"]),
+ ]
+ checker = self._make_checker(delta_files)
+ committed = _make_file("c1", row_count=100, first_row_id=0,
+ write_cols=None)
+ self.assertTrue(checker.conflicts_with(committed))
+
+ def test_conflict_null_write_cols_delta(self):
+ """null write_cols in delta means all columns are in the write
range."""
+ delta_files = [
+ _make_file("d1", row_count=100, first_row_id=0, write_cols=None),
+ ]
+ checker = self._make_checker(delta_files)
+ committed = _make_file("c1", row_count=100, first_row_id=0,
+ write_cols=["col_b"])
+ self.assertTrue(checker.conflicts_with(committed))
+
+ def test_no_conflict_committed_file_no_row_id(self):
+ delta_files = [
+ _make_file("d1", row_count=100, first_row_id=0,
write_cols=["col_a"]),
+ ]
+ checker = self._make_checker(delta_files)
+ committed = _make_file("c1", row_count=100, first_row_id=None,
+ write_cols=["col_a"])
+ self.assertFalse(checker.conflicts_with(committed))
+
+ def test_none_when_no_delta_files_with_row_id(self):
+ delta_files = [
+ _make_file("d1", row_count=100, first_row_id=None),
+ ]
+ schema_mgr = _FakeSchemaManager([_DEFAULT_SCHEMA])
+ checker = RowIdColumnConflictChecker.from_data_files(schema_mgr,
delta_files)
+ self.assertIsNone(checker)
+
+ def test_system_fields_skipped(self):
+ """System fields like _ROW_ID should not count as column conflicts."""
+ delta_files = [
+ _make_file("d1", row_count=100, first_row_id=0,
+ write_cols=["_ROW_ID", "col_a"]),
+ ]
+ checker = self._make_checker(delta_files)
+ committed = _make_file("c1", row_count=100, first_row_id=0,
+ write_cols=["_ROW_ID", "col_b"])
+ self.assertFalse(checker.conflicts_with(committed))
+
+ def test_cross_schema_field_id_resolution(self):
+ """Fields with same ID but different names across schema versions
should still match."""
+ schema_v0 = _FakeSchema(
+ id=0,
+ fields=[
+ DataField(1, "col_a", AtomicType("INT")),
+ DataField(2, "col_b", AtomicType("STRING")),
+ ],
+ )
+ schema_v1 = _FakeSchema(
+ id=1,
+ fields=[
+ DataField(1, "col_a_renamed", AtomicType("INT")),
+ DataField(2, "col_b", AtomicType("STRING")),
+ DataField(3, "col_c", AtomicType("BIGINT")),
+ ],
+ )
+ schema_mgr = _FakeSchemaManager([schema_v0, schema_v1])
+ delta_files = [
+ _make_file("d1", row_count=100, first_row_id=0,
+ schema_id=0, write_cols=["col_a"]),
+ ]
+ checker = RowIdColumnConflictChecker.from_data_files(schema_mgr,
delta_files)
+ committed_same_field = _make_file(
+ "c1", row_count=100, first_row_id=0,
+ schema_id=1, write_cols=["col_a_renamed"])
+ self.assertTrue(checker.conflicts_with(committed_same_field))
+ committed_diff_field = _make_file(
+ "c2", row_count=100, first_row_id=0,
+ schema_id=1, write_cols=["col_c"])
+ self.assertFalse(checker.conflicts_with(committed_diff_field))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/write/commit/conflict_detection.py
b/paimon-python/pypaimon/write/commit/conflict_detection.py
index 6b7652c00b..fc92dc838b 100644
--- a/paimon-python/pypaimon/write/commit/conflict_detection.py
+++ b/paimon-python/pypaimon/write/commit/conflict_detection.py
@@ -19,31 +19,140 @@
Conflict detection for commit operations.
"""
+import bisect
+
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.file_entry import FileEntry
+from pypaimon.table.special_fields import SpecialFields
from pypaimon.utils.range import Range
from pypaimon.utils.range_helper import RangeHelper
from pypaimon.write.commit.commit_scanner import CommitScanner
-class ConflictDetection:
- """Detects conflicts between base and delta files during commit.
+class RowIdColumnConflictChecker:
+ """Checks for row ID × column conflicts between delta files and committed
files.
- This class provides row ID range conflict checks and row ID from snapshot
conflict checks
- for Data Evolution tables.
+ Built from the current commit's delta files. For each committed file,
+ checks whether it overlaps with the delta files on BOTH dimensions:
+ row-id range AND write columns.
"""
+ def __init__(self, write_ranges, schema_manager):
+ self._write_ranges = write_ranges
+ self._schema_manager = schema_manager
+ self._field_id_cache = {}
+
+ @classmethod
+ def from_data_files(cls, schema_manager, delta_files):
+ files_with_row_id = [f for f in delta_files if f.first_row_id is not
None]
+ if not files_with_row_id:
+ return None
+
+ range_helper = RangeHelper(lambda f: f.row_id_range())
+ groups = range_helper.merge_overlapping_ranges(files_with_row_id)
+
+ write_ranges = []
+ for group in groups:
+ merged_from = min(f.first_row_id for f in group)
+ merged_to = max(f.first_row_id + f.row_count - 1 for f in group)
+ merged_range = Range(merged_from, merged_to)
+
+ field_ids = set()
+ for f in group:
+ cls._add_write_field_ids(field_ids, f, schema_manager)
+
+ write_ranges.append(_WriteRange(merged_range, field_ids))
+
+ write_ranges.sort(key=lambda wr: (wr.range.from_, wr.range.to))
+ return cls(write_ranges, schema_manager)
+
+ def is_empty(self):
+ return len(self._write_ranges) == 0
+
+ def conflicts_with(self, file):
+ if file.first_row_id is None:
+ return False
+
+ file_range = Range(file.first_row_id, file.first_row_id +
file.row_count - 1)
+ index = self._first_possible_range(file_range)
+
+ while index < len(self._write_ranges):
+ wr = self._write_ranges[index]
+ if wr.range.from_ > file_range.to:
+ return False
+ if wr.range.overlaps(file_range) and
self._contains_any_write_field(wr.field_ids, file):
+ return True
+ index += 1
+
+ return False
+
+ def _first_possible_range(self, target):
+ keys = [wr.range.to for wr in self._write_ranges]
+ return bisect.bisect_left(keys, target.from_)
+
+ def _contains_any_write_field(self, field_ids, file):
+ if file.write_cols is None:
+ return True
+ for col_name in file.write_cols:
+ fid = self._field_id(file, col_name)
+ if fid is not None and fid in field_ids:
+ return True
+ return False
+
+ def _field_id(self, file, col_name):
+ if SpecialFields.is_system_field(col_name):
+ return None
+ name_to_id = self._field_id_by_name(file.schema_id)
+ fid = name_to_id.get(col_name)
+ if fid is None:
+ raise RuntimeError(
+ f"Column '{col_name}' not found in schema {file.schema_id}")
+ return fid
+
+ def _field_id_by_name(self, schema_id):
+ if schema_id not in self._field_id_cache:
+ schema = self._schema_manager.get_schema(schema_id)
+ if schema is None:
+ raise RuntimeError(f"Schema {schema_id} not found")
+ self._field_id_cache[schema_id] = {
+ field.name: field.id for field in schema.fields
+ }
+ return self._field_id_cache[schema_id]
+
+ @classmethod
+ def _add_write_field_ids(cls, field_ids, file, schema_manager):
+ if file.write_cols is None:
+ schema = schema_manager.get_schema(file.schema_id)
+ if schema is not None:
+ for field in schema.fields:
+ if not SpecialFields.is_system_field(field.name):
+ field_ids.add(field.id)
+ else:
+ name_to_id = {}
+ schema = schema_manager.get_schema(file.schema_id)
+ if schema is not None:
+ name_to_id = {field.name: field.id for field in schema.fields}
+ for col_name in file.write_cols:
+ if SpecialFields.is_system_field(col_name):
+ continue
+ fid = name_to_id.get(col_name)
+ if fid is not None:
+ field_ids.add(fid)
+
+
+class _WriteRange:
+
+ def __init__(self, range_, field_ids):
+ self.range = range_
+ self.field_ids = field_ids
+
+
+class ConflictDetection:
+ """Detects conflicts between base and delta files during commit."""
+
def __init__(self, data_evolution_enabled, snapshot_manager,
manifest_list_manager: ManifestListManager, table,
commit_scanner: CommitScanner):
- """Initialize ConflictDetection.
-
- Args:
- data_evolution_enabled: Whether data evolution feature is enabled.
- snapshot_manager: Manager for reading snapshot metadata.
- manifest_list_manager: Manager for reading manifest lists.
- table: The FileStoreTable instance.
- """
self.data_evolution_enabled = data_evolution_enabled
self.snapshot_manager = snapshot_manager
self.manifest_list_manager = manifest_list_manager
@@ -58,20 +167,6 @@ class ConflictDetection:
return self._row_id_check_from_snapshot is not None
def check_conflicts(self, latest_snapshot, base_entries, delta_entries,
commit_kind):
- """Run all conflict checks and return the first detected conflict.
-
- merges base_entries and delta_entries, then runs conflict checks
- on the merged result.
-
- Args:
- latest_snapshot: The latest snapshot at commit time.
- base_entries: All entries read from the latest snapshot.
- delta_entries: The delta entries being committed.
- commit_kind: The kind of commit (e.g. "APPEND", "COMPACT",
"OVERWRITE").
-
- Returns:
- A RuntimeError if a conflict is detected, otherwise None.
- """
all_entries = list(base_entries) + list(delta_entries)
try:
@@ -80,25 +175,61 @@ class ConflictDetection:
return RuntimeError(
"File deletion conflicts detected! Give up committing. " +
str(e))
+ if commit_kind != "COMPACT":
+ next_row_id = latest_snapshot.next_row_id if latest_snapshot else
None
+ conflict = self.check_row_id_existence(
+ base_entries, delta_entries, next_row_id)
+ if conflict is not None:
+ return conflict
+
conflict = self.check_row_id_range_conflicts(commit_kind,
merged_entries)
if conflict is not None:
return conflict
return self.check_row_id_from_snapshot(latest_snapshot, delta_entries)
- def check_row_id_range_conflicts(self, commit_kind, commit_entries):
- """Check for row ID range conflicts among merged entries.
+ def check_row_id_existence(self, base_entries, delta_entries,
next_row_id=None):
+ if not self.data_evolution_enabled:
+ return None
+
+ if next_row_id is None:
+ return None
+
+ files_to_check = [
+ entry for entry in delta_entries
+ if entry.kind == 0
+ and entry.file.first_row_id is not None
+ and entry.file.first_row_id < next_row_id
+ ]
- only enabled when data evolution is active, and checks that
- overlapping row ID ranges in non-blob data files are identical.
+ if not files_to_check:
+ return None
+
+ existing_index = set()
+ for base in base_entries:
+ if base.file.first_row_id is not None:
+ existing_index.add((
+ base.partition, base.bucket,
+ base.file.first_row_id, base.file.row_count))
+
+ for entry in files_to_check:
+ key = (entry.partition, entry.bucket,
+ entry.file.first_row_id, entry.file.row_count)
+ if key not in existing_index:
+ return RuntimeError(
+ "Row ID existence conflict: file '{}' references "
+ "firstRowId={}, rowCount={} in bucket {}, "
+ "but no matching file exists in the current snapshot. "
+ "The referenced file may have been rewritten by a "
+ "concurrent compaction or removed by an overwrite.".format(
+ entry.file.file_name,
+ entry.file.first_row_id,
+ entry.file.row_count,
+ entry.bucket))
- Args:
- commit_kind: The kind of commit (e.g. "APPEND", "COMPACT").
- commit_entries: The entries being committed.
+ return None
- Returns:
- A RuntimeError if conflict is detected, otherwise None.
- """
+ def check_row_id_range_conflicts(self, commit_kind, commit_entries):
if not self.data_evolution_enabled:
return None
if self._row_id_check_from_snapshot is None and commit_kind !=
"COMPACT":
@@ -137,31 +268,16 @@ class ConflictDetection:
return None
def check_row_id_from_snapshot(self, latest_snapshot, commit_entries):
- """Check for row ID conflicts from a specific snapshot onwards.
-
- collects row ID ranges from delta entries, then checks if any
- incremental changes between the check snapshot and latest snapshot
- have overlapping row ID ranges.
-
- Args:
- latest_snapshot: The latest snapshot at commit time.
- commit_entries: The delta entries being committed.
-
- Returns:
- A RuntimeError if conflict is detected, otherwise None.
- """
if not self.data_evolution_enabled:
return None
if self._row_id_check_from_snapshot is None:
return None
- history_id_ranges = []
- for entry in commit_entries:
- first_row_id = entry.file.first_row_id
- row_count = entry.file.row_count
- if first_row_id is not None:
- history_id_ranges.append(
- Range(first_row_id, first_row_id + row_count - 1))
+ delta_files = [entry.file for entry in commit_entries]
+ column_checker = RowIdColumnConflictChecker.from_data_files(
+ self.table.schema_manager, delta_files)
+ if column_checker is None or column_checker.is_empty():
+ return None
check_snapshot = self.snapshot_manager.get_snapshot_by_id(
self._row_id_check_from_snapshot)
@@ -187,12 +303,11 @@ class ConflictDetection:
if file_range is None:
continue
if file_range.from_ < check_next_row_id:
- for history_range in history_id_ranges:
- if history_range.overlaps(file_range):
- return RuntimeError(
- "For Data Evolution table, multiple 'MERGE
INTO' "
- "operations have encountered conflicts,
updating "
- "the same file, which can render some updates "
- "ineffective.")
+ if column_checker.conflicts_with(entry.file):
+ return RuntimeError(
+ "For Data Evolution table, multiple 'MERGE INTO' "
+ "operations have encountered conflicts, updating "
+ "the same file, which can render some updates "
+ "ineffective.")
return None
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 486e289240..cfb4090e77 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -330,21 +330,6 @@ class FileStoreCommit:
# process snapshot
new_snapshot_id = latest_snapshot.id + 1 if latest_snapshot else 1
- # Check if row tracking is enabled
- row_tracking_enabled = self.table.options.row_tracking_enabled()
-
- # Apply row tracking logic if enabled
- next_row_id = None
- if row_tracking_enabled:
- # Assign snapshot ID to delta files
- commit_entries = self._assign_snapshot_id(new_snapshot_id,
commit_entries)
-
- # Get the next row ID start from the latest snapshot
- first_row_id_start = self._get_next_row_id_start(latest_snapshot)
-
- # Assign row IDs to new files and get the next row ID for the
snapshot
- commit_entries, next_row_id =
self._assign_row_tracking_meta(first_row_id_start, commit_entries)
-
# Conflict detection: read base entries from latest snapshot, then
check conflicts
if detect_conflicts and latest_snapshot is not None:
base_entries =
self.commit_scanner.read_all_entries_from_changed_partitions(
@@ -358,6 +343,14 @@ class FileStoreCommit:
return RetryResult(latest_snapshot, conflict_exception)
raise conflict_exception
+ # Apply row tracking logic after conflict detection (matches Java
ordering)
+ row_tracking_enabled = self.table.options.row_tracking_enabled()
+ next_row_id = None
+ if row_tracking_enabled:
+ commit_entries = self._assign_snapshot_id(new_snapshot_id,
commit_entries)
+ first_row_id_start = self._get_next_row_id_start(latest_snapshot)
+ commit_entries, next_row_id =
self._assign_row_tracking_meta(first_row_id_start, commit_entries)
+
try:
new_manifest_file_meta = self._write_manifest_file(commit_entries,
new_manifest_file)
self.manifest_list_manager.write(delta_manifest_list,
[new_manifest_file_meta])
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index ff54322b92..644eb49847 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -201,7 +201,10 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
success = true
} catch {
case e: Exception =>
- if (!e.getMessage.contains("multiple 'MERGE INTO' and 'COMPACT'
operations")) {
+ if (
+ !e.getMessage.contains("multiple 'MERGE INTO' and 'COMPACT'
operations")
+ && !e.getMessage.contains("Row ID existence conflict")
+ ) {
throw e
}
}