This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b4d8b65785 [core][flink] Detect overwrite conflicts for index commits 
(#7972)
b4d8b65785 is described below

commit b4d8b657850521e219d97142d9325cfeec2acf9b
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jun 9 13:52:47 2026 +0800

    [core][flink] Detect overwrite conflicts for index commits (#7972)
---
 .../org/apache/paimon/utils/RowRangeIndex.java     |   7 +
 .../org/apache/paimon/utils/RowRangeIndexTest.java |  41 ++++++
 .../paimon/operation/FileStoreCommitImpl.java      |   7 +
 .../paimon/operation/commit/ConflictDetection.java |  67 +++++++++
 .../operation/commit/ManifestEntryChanges.java     |   3 +-
 .../paimon/operation/FileStoreCommitTest.java      | 104 ++++++++++++-
 .../operation/commit/ConflictDetectionTest.java    | 163 ++++++++++++++++++++-
 .../globalindex/GenericGlobalIndexBuilder.java     |  13 +-
 8 files changed, 395 insertions(+), 10 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java
index a068d04f72..ffd83eddd2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java
@@ -56,6 +56,13 @@ public class RowRangeIndex {
         return candidate < starts.length && starts[candidate] <= end;
     }
 
+    public boolean contains(Range range) {
+        int candidate = lowerBound(ends, range.from);
+        return candidate < starts.length
+                && starts[candidate] <= range.from
+                && ends[candidate] >= range.to;
+    }
+
     public List<Range> intersectedRanges(long start, long end) {
         int left = lowerBound(ends, start);
         if (left >= ranges.size()) {
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/RowRangeIndexTest.java 
b/paimon-common/src/test/java/org/apache/paimon/utils/RowRangeIndexTest.java
new file mode 100644
index 0000000000..7c4a9b1550
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/RowRangeIndexTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RowRangeIndex}. */
+class RowRangeIndexTest {
+
+    @Test
+    void testContains() {
+        RowRangeIndex index =
+                RowRangeIndex.create(
+                        Arrays.asList(new Range(0, 99), new Range(100, 149), 
new Range(200, 299)));
+
+        assertThat(index.contains(new Range(0, 149))).isTrue();
+        assertThat(index.contains(new Range(50, 120))).isTrue();
+        assertThat(index.contains(new Range(150, 199))).isFalse();
+        assertThat(index.contains(new Range(100, 200))).isFalse();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 7cca259cbf..afb7ad6698 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -328,6 +328,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     checkAppendFiles = true;
                     allowRollback = true;
                 }
+                if (changes.appendIndexFiles.stream()
+                        .anyMatch(
+                                entry ->
+                                        entry.kind() == FileKind.ADD
+                                                && 
entry.indexFile().globalIndexMeta() != null)) {
+                    checkAppendFiles = true;
+                }
 
                 attempts +=
                         tryCommit(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 493d13d88d..ec5829e81c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -23,6 +23,7 @@ import org.apache.paimon.Snapshot.CommitKind;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.GlobalIndexMeta;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
@@ -37,7 +38,9 @@ import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Range;
 import org.apache.paimon.utils.RangeHelper;
+import org.apache.paimon.utils.RowRangeIndex;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
@@ -237,6 +240,11 @@ public class ConflictDetection {
             return exception;
         }
 
+        exception = checkGlobalIndexRowIdExistence(baseEntries, 
deltaIndexEntries);
+        if (exception.isPresent()) {
+            return exception;
+        }
+
         return checkForRowIdFromSnapshot(
                 latestSnapshot, deltaEntries, deltaIndexEntries, 
rowIdColumnConflictChecker);
     }
@@ -544,6 +552,65 @@ public class ConflictDetection {
         return Optional.empty();
     }
 
+    private Optional<RuntimeException> checkGlobalIndexRowIdExistence(
+            List<SimpleFileEntry> baseEntries, List<IndexManifestEntry> 
deltaIndexEntries) {
+        if (!dataEvolutionEnabled) {
+            return Optional.empty();
+        }
+
+        List<IndexManifestEntry> indexesToCheck = 
globalIndexFileAdditions(deltaIndexEntries);
+        if (indexesToCheck.isEmpty()) {
+            return Optional.empty();
+        }
+
+        Map<Pair<BinaryRow, Integer>, List<Range>> dataRanges = new 
HashMap<>();
+        for (SimpleFileEntry entry : baseEntries) {
+            if (entry.kind() == FileKind.ADD && entry.firstRowId() != null) {
+                dataRanges
+                        .computeIfAbsent(
+                                Pair.of(entry.partition(), entry.bucket()), k 
-> new ArrayList<>())
+                        .add(entry.nonNullRowIdRange());
+            }
+        }
+        Map<Pair<BinaryRow, Integer>, RowRangeIndex> rowRangeIndexes =
+                dataRanges.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        entry -> 
RowRangeIndex.create(entry.getValue())));
+
+        for (IndexManifestEntry indexEntry : indexesToCheck) {
+            GlobalIndexMeta globalIndex = 
indexEntry.indexFile().globalIndexMeta();
+            checkState(globalIndex != null, "Global index meta must not be 
null.");
+            Range indexRange = globalIndex.rowRange();
+            RowRangeIndex rowRangeIndex =
+                    rowRangeIndexes.get(Pair.of(indexEntry.partition(), 
indexEntry.bucket()));
+            if (rowRangeIndex == null || !rowRangeIndex.contains(indexRange)) {
+                return Optional.of(
+                        new RuntimeException(
+                                String.format(
+                                        "Global index row ID existence 
conflict: index file '%s' "
+                                                + "references row range %s, 
but this range "
+                                                + "is not fully covered by 
current data "
+                                                + "files. The referenced row 
IDs may have been "
+                                                + "reassigned or removed by a 
concurrent commit.",
+                                        indexEntry.indexFile().fileName(), 
indexRange)));
+            }
+        }
+        return Optional.empty();
+    }
+
+    private List<IndexManifestEntry> globalIndexFileAdditions(
+            List<IndexManifestEntry> indexFileChanges) {
+        List<IndexManifestEntry> result = new ArrayList<>();
+        for (IndexManifestEntry entry : indexFileChanges) {
+            if (entry.kind() == FileKind.ADD && 
entry.indexFile().globalIndexMeta() != null) {
+                result.add(entry);
+            }
+        }
+        return result;
+    }
+
     Optional<RuntimeException> checkRowIdExistence(
             List<SimpleFileEntry> baseEntries,
             List<SimpleFileEntry> deltaEntries,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
index f32f6c9ef5..faa65bba47 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
@@ -170,7 +170,8 @@ public class ManifestEntryChanges {
             changedPartitions.add(file.partition());
         }
         for (IndexManifestEntry file : indexFileChanges) {
-            if (file.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) {
+            if (file.indexFile().indexType().equals(DELETION_VECTORS_INDEX)
+                    || file.indexFile().globalIndexMeta() != null) {
                 changedPartitions.add(file.partition());
             }
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index e13b11d474..ef40a0fa2c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -31,8 +31,11 @@ import 
org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.GlobalIndexMeta;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCommittable;
@@ -1008,6 +1011,48 @@ public class FileStoreCommitTest {
         }
     }
 
+    @Test
+    public void testGlobalIndexCommitChecksExistingRowIds() throws Exception {
+        TestFileStore store = createRowTrackingDataEvolutionStore();
+
+        List<KeyValue> keyValues = generateDataList(1);
+        BinaryRow partition = gen.getPartition(keyValues.get(0));
+        Snapshot dataSnapshot = store.commitData(keyValues, s -> partition, kv 
-> 0).get(0);
+        assertThat(dataSnapshot.nextRowId()).isEqualTo(1L);
+
+        try (FileStoreCommitImpl commit = store.newCommit()) {
+            commit.commit(indexCommittable(partition, "existing-index", 0, 0), 
false);
+        }
+
+        Snapshot latest = 
checkNotNull(store.snapshotManager().latestSnapshot());
+        assertThat(latest.indexManifest()).isNotNull();
+    }
+
+    @Test
+    public void testGlobalIndexCommitFailsForMissingRowIds() throws Exception {
+        TestFileStore store = createRowTrackingDataEvolutionStore();
+
+        List<KeyValue> keyValues = generateDataList(1);
+        BinaryRow partition = gen.getPartition(keyValues.get(0));
+        Snapshot dataSnapshot = store.commitData(keyValues, s -> partition, kv 
-> 0).get(0);
+        long missingRowId = checkNotNull(dataSnapshot.nextRowId());
+
+        try (FileStoreCommitImpl commit = store.newCommit()) {
+            assertThatThrownBy(
+                            () ->
+                                    commit.commit(
+                                            indexCommittable(
+                                                    partition,
+                                                    "missing-index",
+                                                    missingRowId,
+                                                    missingRowId),
+                                            false))
+                    .hasMessageContaining("Global index row ID existence 
conflict")
+                    .hasMessageContaining("missing-index")
+                    .hasMessageContaining("[" + missingRowId + ", " + 
missingRowId + "]");
+        }
+    }
+
     @Test
     public void testCommitTwiceWithDifferentKind() throws Exception {
         TestFileStore store = createStore(false);
@@ -1082,6 +1127,20 @@ public class FileStoreCommitTest {
 
     private FileStoreCommitImpl newCommitWithSnapshotCommit(
             TestFileStore store, String commitUser, SnapshotCommit 
snapshotCommit) {
+        return newCommitWithSnapshotCommit(
+                store,
+                commitUser,
+                snapshotCommit,
+                store.options(),
+                store.options().dataEvolutionEnabled());
+    }
+
+    private FileStoreCommitImpl newCommitWithSnapshotCommit(
+            TestFileStore store,
+            String commitUser,
+            SnapshotCommit snapshotCommit,
+            CoreOptions options,
+            boolean dataEvolutionEnabled) {
         String tableName = store.options().path().getName();
         return new FileStoreCommitImpl(
                 snapshotCommit,
@@ -1090,7 +1149,7 @@ public class FileStoreCommitTest {
                 tableName,
                 commitUser,
                 store.partitionType(),
-                store.options(),
+                options,
                 store.pathFactory(),
                 store.snapshotManager(),
                 store.manifestFileFactory(),
@@ -1109,15 +1168,37 @@ public class FileStoreCommitTest {
                                 store.pathFactory(),
                                 store.newKeyComparator(),
                                 store.bucketMode(),
-                                store.options().deletionVectorsEnabled(),
-                                store.options().dataEvolutionEnabled(),
-                                store.options().pkClusteringOverride(),
+                                options.deletionVectorsEnabled(),
+                                dataEvolutionEnabled,
+                                options.pkClusteringOverride(),
                                 store.newIndexFileHandler(),
                                 store.snapshotManager(),
                                 scanner),
                 null);
     }
 
+    private ManifestCommittable indexCommittable(
+            BinaryRow partition, String fileName, long rowRangeStart, long 
rowRangeEnd) {
+        ManifestCommittable committable = new ManifestCommittable(0);
+        committable.addFileCommittable(
+                new CommitMessageImpl(
+                        partition,
+                        0,
+                        null,
+                        DataIncrement.indexIncrement(
+                                Collections.singletonList(
+                                        new IndexFileMeta(
+                                                "btree",
+                                                fileName,
+                                                1,
+                                                1,
+                                                new GlobalIndexMeta(
+                                                        rowRangeStart, 
rowRangeEnd, 0, null, null),
+                                                null))),
+                        CompactIncrement.emptyIncrement()));
+        return committable;
+    }
+
     private static class FalseSuccessSnapshotCommit implements SnapshotCommit {
 
         private final SnapshotCommit delegate;
@@ -1153,6 +1234,13 @@ public class FileStoreCommitTest {
         return createStore(failing, 1, CoreOptions.ChangelogProducer.NONE, 
options);
     }
 
+    private TestFileStore createRowTrackingDataEvolutionStore() throws 
Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        return createStore(false, -1, CoreOptions.ChangelogProducer.NONE, 
options);
+    }
+
     private TestFileStore createStore(boolean failing) throws Exception {
         return createStore(failing, 1);
     }
@@ -1179,14 +1267,18 @@ public class FileStoreCommitTest {
                         ? FailingFileIO.getFailingPath(failingName, 
tempDir.toString())
                         : TraceableFileIO.SCHEME + "://" + tempDir.toString();
         Path path = new Path(tempDir.toUri());
+        List<String> primaryKeys =
+                
Boolean.parseBoolean(options.get(CoreOptions.ROW_TRACKING_ENABLED.key()))
+                        ? Collections.emptyList()
+                        : TestKeyValueGenerator.getPrimaryKeys(
+                                
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED);
         TableSchema tableSchema =
                 SchemaUtils.forceCommit(
                         new SchemaManager(new LocalFileIO(), path),
                         new Schema(
                                 
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
                                 
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
-                                TestKeyValueGenerator.getPrimaryKeys(
-                                        
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+                                primaryKeys,
                                 options,
                                 null));
         return new TestFileStore.Builder(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
index 1c36b9e09e..20be3b2c6e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
@@ -18,7 +18,10 @@
 
 package org.apache.paimon.operation.commit;
 
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.GlobalIndexMeta;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.manifest.FileEntry;
 import org.apache.paimon.manifest.FileKind;
@@ -330,6 +333,26 @@ class ConflictDetectionTest {
                         DELETION_VECTORS_INDEX, fileName, 11, dvRanges.size(), 
dvRanges, null));
     }
 
+    private IndexManifestEntry createGlobalIndexEntry(
+            String fileName, FileKind kind, BinaryRow partition, long from, 
long to) {
+        return createGlobalIndexEntry(fileName, kind, partition, 0, from, to);
+    }
+
+    private IndexManifestEntry createGlobalIndexEntry(
+            String fileName, FileKind kind, BinaryRow partition, int bucket, 
long from, long to) {
+        return new IndexManifestEntry(
+                kind,
+                partition,
+                bucket,
+                new IndexFileMeta(
+                        "btree",
+                        fileName,
+                        11,
+                        1,
+                        new GlobalIndexMeta(from, to, 0, null, null),
+                        null));
+    }
+
     private void assertConflict(
             List<SimpleFileEntry> baseEntries, List<SimpleFileEntry> 
deltaEntries) {
         ArrayList<SimpleFileEntry> simpleFileEntryWithDVS = new 
ArrayList<>(baseEntries);
@@ -375,6 +398,18 @@ class ConflictDetectionTest {
                 .isFalse();
     }
 
+    @Test
+    void testChangedPartitionsIncludesGlobalIndexFiles() {
+        BinaryRow partition = BinaryRow.singleColumn(1);
+
+        assertThat(
+                        ManifestEntryChanges.changedPartitions(
+                                Collections.emptyList(),
+                                Collections.singletonList(
+                                        createGlobalIndexEntry("idx", ADD, 
partition, 0, 99))))
+                .containsExactly(partition);
+    }
+
     @Test
     void testCheckRowIdExistenceNoConflict() {
         ConflictDetection detection = createConflictDetection();
@@ -473,10 +508,20 @@ class ConflictDetectionTest {
 
     private SimpleFileEntry createFileEntryWithRowId(
             String fileName, FileKind kind, long firstRowId, long rowCount) {
+        return createFileEntryWithRowId(fileName, kind, EMPTY_ROW, 0, 
firstRowId, rowCount);
+    }
+
+    private SimpleFileEntry createFileEntryWithRowId(
+            String fileName,
+            FileKind kind,
+            BinaryRow partition,
+            int bucket,
+            long firstRowId,
+            long rowCount) {
         return new SimpleFileEntry(
                 kind,
-                EMPTY_ROW,
-                0,
+                partition,
+                bucket,
                 1,
                 0,
                 fileName,
@@ -489,6 +534,96 @@ class ConflictDetectionTest {
                 firstRowId);
     }
 
+    @Test
+    void testCheckGlobalIndexRowIdExistenceNoConflict() {
+        ConflictDetection detection = createConflictDetection();
+
+        Optional<RuntimeException> exception =
+                detection.checkConflicts(
+                        snapshot(1),
+                        Arrays.asList(
+                                createFileEntryWithRowId("f1", ADD, 0L, 100L),
+                                createFileEntryWithRowId("f2", ADD, 100L, 
50L)),
+                        Collections.emptyList(),
+                        Collections.singletonList(
+                                createGlobalIndexEntry("idx", ADD, 
BinaryRow.EMPTY_ROW, 0, 149)),
+                        null,
+                        Snapshot.CommitKind.APPEND);
+
+        assertThat(exception).isNotPresent();
+    }
+
+    @Test
+    void testCheckGlobalIndexRowIdExistenceBaseFileRemoved() {
+        ConflictDetection detection = createConflictDetection();
+
+        Optional<RuntimeException> exception =
+                detection.checkConflicts(
+                        snapshot(1),
+                        
Collections.singletonList(createFileEntryWithRowId("f1", ADD, 0L, 100L)),
+                        Collections.emptyList(),
+                        Collections.singletonList(
+                                createGlobalIndexEntry("idx", ADD, 
BinaryRow.EMPTY_ROW, 0, 149)),
+                        null,
+                        Snapshot.CommitKind.APPEND);
+
+        assertThat(exception).isPresent();
+        assertThat(exception.get())
+                .hasMessageContaining("Global index row ID existence conflict")
+                .hasMessageContaining("idx")
+                .hasMessageContaining("[0, 149]");
+    }
+
+    @Test
+    void testCheckGlobalIndexRowIdExistenceByPartitionAndBucket() {
+        ConflictDetection detection = createConflictDetection();
+        BinaryRow partition0 = BinaryRow.singleColumn(0);
+        BinaryRow partition1 = BinaryRow.singleColumn(1);
+
+        Optional<RuntimeException> exception =
+                detection.checkConflicts(
+                        snapshot(1),
+                        Collections.singletonList(
+                                createFileEntryWithRowId("f1", ADD, 
partition1, 0, 0L, 150L)),
+                        Collections.emptyList(),
+                        Collections.singletonList(
+                                createGlobalIndexEntry("idx", ADD, partition0, 
0, 0, 149)),
+                        null,
+                        Snapshot.CommitKind.APPEND);
+
+        assertThat(exception).isPresent();
+
+        exception =
+                detection.checkConflicts(
+                        snapshot(1),
+                        Collections.singletonList(
+                                createFileEntryWithRowId("f1", ADD, 
partition0, 1, 0L, 150L)),
+                        Collections.emptyList(),
+                        Collections.singletonList(
+                                createGlobalIndexEntry("idx", ADD, partition0, 
0, 0, 149)),
+                        null,
+                        Snapshot.CommitKind.APPEND);
+
+        assertThat(exception).isPresent();
+    }
+
+    @Test
+    void testCheckGlobalIndexRowIdExistenceSkipsDeleteIndexEntry() {
+        ConflictDetection detection = createConflictDetection();
+
+        Optional<RuntimeException> exception =
+                detection.checkConflicts(
+                        snapshot(1),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.singletonList(
+                                createGlobalIndexEntry("idx", DELETE, 
BinaryRow.EMPTY_ROW, 0, 149)),
+                        null,
+                        Snapshot.CommitKind.APPEND);
+
+        assertThat(exception).isNotPresent();
+    }
+
     private ConflictDetection createConflictDetection() {
         return new ConflictDetection(
                 "test-table",
@@ -504,4 +639,28 @@ class ConflictDetectionTest {
                 null,
                 null);
     }
+
+    private Snapshot snapshot(long id) {
+        return new Snapshot(
+                id,
+                0,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                "commit-user",
+                id,
+                Snapshot.CommitKind.APPEND,
+                id,
+                0,
+                0,
+                null,
+                null,
+                null,
+                null,
+                null);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java
index 48ea935f5b..415c1590a5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.globalindex;
 
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.partition.PartitionPredicate;
@@ -72,7 +73,17 @@ public class GenericGlobalIndexBuilder implements 
Serializable {
                         + "deleted rows to be indexed.",
                 table.name());
 
-        return 
table.store().newScan().withPartitionFilter(partitionPredicate).plan().files();
+        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        if (snapshot == null) {
+            return Collections.emptyList();
+        }
+
+        return table.store()
+                .newScan()
+                .withSnapshot(snapshot)
+                .withPartitionFilter(partitionPredicate)
+                .plan()
+                .files();
     }
 
     /** Returns old index file entries that should be deleted after new 
indexes are built. */

Reply via email to