This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b4d8b65785 [core][flink] Detect overwrite conflicts for index commits
(#7972)
b4d8b65785 is described below
commit b4d8b657850521e219d97142d9325cfeec2acf9b
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jun 9 13:52:47 2026 +0800
[core][flink] Detect overwrite conflicts for index commits (#7972)
---
.../org/apache/paimon/utils/RowRangeIndex.java | 7 +
.../org/apache/paimon/utils/RowRangeIndexTest.java | 41 ++++++
.../paimon/operation/FileStoreCommitImpl.java | 7 +
.../paimon/operation/commit/ConflictDetection.java | 67 +++++++++
.../operation/commit/ManifestEntryChanges.java | 3 +-
.../paimon/operation/FileStoreCommitTest.java | 104 ++++++++++++-
.../operation/commit/ConflictDetectionTest.java | 163 ++++++++++++++++++++-
.../globalindex/GenericGlobalIndexBuilder.java | 13 +-
8 files changed, 395 insertions(+), 10 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java
b/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java
index a068d04f72..ffd83eddd2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java
@@ -56,6 +56,13 @@ public class RowRangeIndex {
return candidate < starts.length && starts[candidate] <= end;
}
+ public boolean contains(Range range) {
+ int candidate = lowerBound(ends, range.from);
+ return candidate < starts.length
+ && starts[candidate] <= range.from
+ && ends[candidate] >= range.to;
+ }
+
public List<Range> intersectedRanges(long start, long end) {
int left = lowerBound(ends, start);
if (left >= ranges.size()) {
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/RowRangeIndexTest.java
b/paimon-common/src/test/java/org/apache/paimon/utils/RowRangeIndexTest.java
new file mode 100644
index 0000000000..7c4a9b1550
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/RowRangeIndexTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RowRangeIndex}. */
+class RowRangeIndexTest {
+
+ @Test
+ void testContains() {
+ RowRangeIndex index =
+ RowRangeIndex.create(
+ Arrays.asList(new Range(0, 99), new Range(100, 149),
new Range(200, 299)));
+
+ assertThat(index.contains(new Range(0, 149))).isTrue();
+ assertThat(index.contains(new Range(50, 120))).isTrue();
+ assertThat(index.contains(new Range(150, 199))).isFalse();
+ assertThat(index.contains(new Range(100, 200))).isFalse();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 7cca259cbf..afb7ad6698 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -328,6 +328,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
checkAppendFiles = true;
allowRollback = true;
}
+ if (changes.appendIndexFiles.stream()
+ .anyMatch(
+ entry ->
+ entry.kind() == FileKind.ADD
+ &&
entry.indexFile().globalIndexMeta() != null)) {
+ checkAppendFiles = true;
+ }
attempts +=
tryCommit(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 493d13d88d..ec5829e81c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -23,6 +23,7 @@ import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
@@ -37,7 +38,9 @@ import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RangeHelper;
+import org.apache.paimon.utils.RowRangeIndex;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
@@ -237,6 +240,11 @@ public class ConflictDetection {
return exception;
}
+ exception = checkGlobalIndexRowIdExistence(baseEntries,
deltaIndexEntries);
+ if (exception.isPresent()) {
+ return exception;
+ }
+
return checkForRowIdFromSnapshot(
latestSnapshot, deltaEntries, deltaIndexEntries,
rowIdColumnConflictChecker);
}
@@ -544,6 +552,65 @@ public class ConflictDetection {
return Optional.empty();
}
+ private Optional<RuntimeException> checkGlobalIndexRowIdExistence(
+ List<SimpleFileEntry> baseEntries, List<IndexManifestEntry>
deltaIndexEntries) {
+ if (!dataEvolutionEnabled) {
+ return Optional.empty();
+ }
+
+ List<IndexManifestEntry> indexesToCheck =
globalIndexFileAdditions(deltaIndexEntries);
+ if (indexesToCheck.isEmpty()) {
+ return Optional.empty();
+ }
+
+ Map<Pair<BinaryRow, Integer>, List<Range>> dataRanges = new
HashMap<>();
+ for (SimpleFileEntry entry : baseEntries) {
+ if (entry.kind() == FileKind.ADD && entry.firstRowId() != null) {
+ dataRanges
+ .computeIfAbsent(
+ Pair.of(entry.partition(), entry.bucket()), k
-> new ArrayList<>())
+ .add(entry.nonNullRowIdRange());
+ }
+ }
+ Map<Pair<BinaryRow, Integer>, RowRangeIndex> rowRangeIndexes =
+ dataRanges.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
RowRangeIndex.create(entry.getValue())));
+
+ for (IndexManifestEntry indexEntry : indexesToCheck) {
+ GlobalIndexMeta globalIndex =
indexEntry.indexFile().globalIndexMeta();
+ checkState(globalIndex != null, "Global index meta must not be
null.");
+ Range indexRange = globalIndex.rowRange();
+ RowRangeIndex rowRangeIndex =
+ rowRangeIndexes.get(Pair.of(indexEntry.partition(),
indexEntry.bucket()));
+ if (rowRangeIndex == null || !rowRangeIndex.contains(indexRange)) {
+ return Optional.of(
+ new RuntimeException(
+ String.format(
+ "Global index row ID existence
conflict: index file '%s' "
+ + "references row range %s,
but this range "
+ + "is not fully covered by
current data "
+ + "files. The referenced row
IDs may have been "
+ + "reassigned or removed by a
concurrent commit.",
+ indexEntry.indexFile().fileName(),
indexRange)));
+ }
+ }
+ return Optional.empty();
+ }
+
+ private List<IndexManifestEntry> globalIndexFileAdditions(
+ List<IndexManifestEntry> indexFileChanges) {
+ List<IndexManifestEntry> result = new ArrayList<>();
+ for (IndexManifestEntry entry : indexFileChanges) {
+ if (entry.kind() == FileKind.ADD &&
entry.indexFile().globalIndexMeta() != null) {
+ result.add(entry);
+ }
+ }
+ return result;
+ }
+
Optional<RuntimeException> checkRowIdExistence(
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> deltaEntries,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
index f32f6c9ef5..faa65bba47 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
@@ -170,7 +170,8 @@ public class ManifestEntryChanges {
changedPartitions.add(file.partition());
}
for (IndexManifestEntry file : indexFileChanges) {
- if (file.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) {
+ if (file.indexFile().indexType().equals(DELETION_VECTORS_INDEX)
+ || file.indexFile().globalIndexMeta() != null) {
changedPartitions.add(file.partition());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index e13b11d474..ef40a0fa2c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -31,8 +31,11 @@ import
org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
@@ -1008,6 +1011,48 @@ public class FileStoreCommitTest {
}
}
+ @Test
+ public void testGlobalIndexCommitChecksExistingRowIds() throws Exception {
+ TestFileStore store = createRowTrackingDataEvolutionStore();
+
+ List<KeyValue> keyValues = generateDataList(1);
+ BinaryRow partition = gen.getPartition(keyValues.get(0));
+ Snapshot dataSnapshot = store.commitData(keyValues, s -> partition, kv
-> 0).get(0);
+ assertThat(dataSnapshot.nextRowId()).isEqualTo(1L);
+
+ try (FileStoreCommitImpl commit = store.newCommit()) {
+ commit.commit(indexCommittable(partition, "existing-index", 0, 0),
false);
+ }
+
+ Snapshot latest =
checkNotNull(store.snapshotManager().latestSnapshot());
+ assertThat(latest.indexManifest()).isNotNull();
+ }
+
+ @Test
+ public void testGlobalIndexCommitFailsForMissingRowIds() throws Exception {
+ TestFileStore store = createRowTrackingDataEvolutionStore();
+
+ List<KeyValue> keyValues = generateDataList(1);
+ BinaryRow partition = gen.getPartition(keyValues.get(0));
+ Snapshot dataSnapshot = store.commitData(keyValues, s -> partition, kv
-> 0).get(0);
+ long missingRowId = checkNotNull(dataSnapshot.nextRowId());
+
+ try (FileStoreCommitImpl commit = store.newCommit()) {
+ assertThatThrownBy(
+ () ->
+ commit.commit(
+ indexCommittable(
+ partition,
+ "missing-index",
+ missingRowId,
+ missingRowId),
+ false))
+ .hasMessageContaining("Global index row ID existence
conflict")
+ .hasMessageContaining("missing-index")
+ .hasMessageContaining("[" + missingRowId + ", " +
missingRowId + "]");
+ }
+ }
+
@Test
public void testCommitTwiceWithDifferentKind() throws Exception {
TestFileStore store = createStore(false);
@@ -1082,6 +1127,20 @@ public class FileStoreCommitTest {
private FileStoreCommitImpl newCommitWithSnapshotCommit(
TestFileStore store, String commitUser, SnapshotCommit
snapshotCommit) {
+ return newCommitWithSnapshotCommit(
+ store,
+ commitUser,
+ snapshotCommit,
+ store.options(),
+ store.options().dataEvolutionEnabled());
+ }
+
+ private FileStoreCommitImpl newCommitWithSnapshotCommit(
+ TestFileStore store,
+ String commitUser,
+ SnapshotCommit snapshotCommit,
+ CoreOptions options,
+ boolean dataEvolutionEnabled) {
String tableName = store.options().path().getName();
return new FileStoreCommitImpl(
snapshotCommit,
@@ -1090,7 +1149,7 @@ public class FileStoreCommitTest {
tableName,
commitUser,
store.partitionType(),
- store.options(),
+ options,
store.pathFactory(),
store.snapshotManager(),
store.manifestFileFactory(),
@@ -1109,15 +1168,37 @@ public class FileStoreCommitTest {
store.pathFactory(),
store.newKeyComparator(),
store.bucketMode(),
- store.options().deletionVectorsEnabled(),
- store.options().dataEvolutionEnabled(),
- store.options().pkClusteringOverride(),
+ options.deletionVectorsEnabled(),
+ dataEvolutionEnabled,
+ options.pkClusteringOverride(),
store.newIndexFileHandler(),
store.snapshotManager(),
scanner),
null);
}
+ private ManifestCommittable indexCommittable(
+ BinaryRow partition, String fileName, long rowRangeStart, long
rowRangeEnd) {
+ ManifestCommittable committable = new ManifestCommittable(0);
+ committable.addFileCommittable(
+ new CommitMessageImpl(
+ partition,
+ 0,
+ null,
+ DataIncrement.indexIncrement(
+ Collections.singletonList(
+ new IndexFileMeta(
+ "btree",
+ fileName,
+ 1,
+ 1,
+ new GlobalIndexMeta(
+ rowRangeStart,
rowRangeEnd, 0, null, null),
+ null))),
+ CompactIncrement.emptyIncrement()));
+ return committable;
+ }
+
private static class FalseSuccessSnapshotCommit implements SnapshotCommit {
private final SnapshotCommit delegate;
@@ -1153,6 +1234,13 @@ public class FileStoreCommitTest {
return createStore(failing, 1, CoreOptions.ChangelogProducer.NONE,
options);
}
+ private TestFileStore createRowTrackingDataEvolutionStore() throws
Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ return createStore(false, -1, CoreOptions.ChangelogProducer.NONE,
options);
+ }
+
private TestFileStore createStore(boolean failing) throws Exception {
return createStore(failing, 1);
}
@@ -1179,14 +1267,18 @@ public class FileStoreCommitTest {
? FailingFileIO.getFailingPath(failingName,
tempDir.toString())
: TraceableFileIO.SCHEME + "://" + tempDir.toString();
Path path = new Path(tempDir.toUri());
+ List<String> primaryKeys =
+
Boolean.parseBoolean(options.get(CoreOptions.ROW_TRACKING_ENABLED.key()))
+ ? Collections.emptyList()
+ : TestKeyValueGenerator.getPrimaryKeys(
+
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED);
TableSchema tableSchema =
SchemaUtils.forceCommit(
new SchemaManager(new LocalFileIO(), path),
new Schema(
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
- TestKeyValueGenerator.getPrimaryKeys(
-
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+ primaryKeys,
options,
null));
return new TestFileStore.Builder(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
index 1c36b9e09e..20be3b2c6e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
@@ -18,7 +18,10 @@
package org.apache.paimon.operation.commit;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileKind;
@@ -330,6 +333,26 @@ class ConflictDetectionTest {
DELETION_VECTORS_INDEX, fileName, 11, dvRanges.size(),
dvRanges, null));
}
+ private IndexManifestEntry createGlobalIndexEntry(
+ String fileName, FileKind kind, BinaryRow partition, long from,
long to) {
+ return createGlobalIndexEntry(fileName, kind, partition, 0, from, to);
+ }
+
+ private IndexManifestEntry createGlobalIndexEntry(
+ String fileName, FileKind kind, BinaryRow partition, int bucket,
long from, long to) {
+ return new IndexManifestEntry(
+ kind,
+ partition,
+ bucket,
+ new IndexFileMeta(
+ "btree",
+ fileName,
+ 11,
+ 1,
+ new GlobalIndexMeta(from, to, 0, null, null),
+ null));
+ }
+
private void assertConflict(
List<SimpleFileEntry> baseEntries, List<SimpleFileEntry>
deltaEntries) {
ArrayList<SimpleFileEntry> simpleFileEntryWithDVS = new
ArrayList<>(baseEntries);
@@ -375,6 +398,18 @@ class ConflictDetectionTest {
.isFalse();
}
+ @Test
+ void testChangedPartitionsIncludesGlobalIndexFiles() {
+ BinaryRow partition = BinaryRow.singleColumn(1);
+
+ assertThat(
+ ManifestEntryChanges.changedPartitions(
+ Collections.emptyList(),
+ Collections.singletonList(
+ createGlobalIndexEntry("idx", ADD,
partition, 0, 99))))
+ .containsExactly(partition);
+ }
+
@Test
void testCheckRowIdExistenceNoConflict() {
ConflictDetection detection = createConflictDetection();
@@ -473,10 +508,20 @@ class ConflictDetectionTest {
private SimpleFileEntry createFileEntryWithRowId(
String fileName, FileKind kind, long firstRowId, long rowCount) {
+ return createFileEntryWithRowId(fileName, kind, EMPTY_ROW, 0,
firstRowId, rowCount);
+ }
+
+ private SimpleFileEntry createFileEntryWithRowId(
+ String fileName,
+ FileKind kind,
+ BinaryRow partition,
+ int bucket,
+ long firstRowId,
+ long rowCount) {
return new SimpleFileEntry(
kind,
- EMPTY_ROW,
- 0,
+ partition,
+ bucket,
1,
0,
fileName,
@@ -489,6 +534,96 @@ class ConflictDetectionTest {
firstRowId);
}
+ @Test
+ void testCheckGlobalIndexRowIdExistenceNoConflict() {
+ ConflictDetection detection = createConflictDetection();
+
+ Optional<RuntimeException> exception =
+ detection.checkConflicts(
+ snapshot(1),
+ Arrays.asList(
+ createFileEntryWithRowId("f1", ADD, 0L, 100L),
+ createFileEntryWithRowId("f2", ADD, 100L,
50L)),
+ Collections.emptyList(),
+ Collections.singletonList(
+ createGlobalIndexEntry("idx", ADD,
BinaryRow.EMPTY_ROW, 0, 149)),
+ null,
+ Snapshot.CommitKind.APPEND);
+
+ assertThat(exception).isNotPresent();
+ }
+
+ @Test
+ void testCheckGlobalIndexRowIdExistenceBaseFileRemoved() {
+ ConflictDetection detection = createConflictDetection();
+
+ Optional<RuntimeException> exception =
+ detection.checkConflicts(
+ snapshot(1),
+
Collections.singletonList(createFileEntryWithRowId("f1", ADD, 0L, 100L)),
+ Collections.emptyList(),
+ Collections.singletonList(
+ createGlobalIndexEntry("idx", ADD,
BinaryRow.EMPTY_ROW, 0, 149)),
+ null,
+ Snapshot.CommitKind.APPEND);
+
+ assertThat(exception).isPresent();
+ assertThat(exception.get())
+ .hasMessageContaining("Global index row ID existence conflict")
+ .hasMessageContaining("idx")
+ .hasMessageContaining("[0, 149]");
+ }
+
+ @Test
+ void testCheckGlobalIndexRowIdExistenceByPartitionAndBucket() {
+ ConflictDetection detection = createConflictDetection();
+ BinaryRow partition0 = BinaryRow.singleColumn(0);
+ BinaryRow partition1 = BinaryRow.singleColumn(1);
+
+ Optional<RuntimeException> exception =
+ detection.checkConflicts(
+ snapshot(1),
+ Collections.singletonList(
+ createFileEntryWithRowId("f1", ADD,
partition1, 0, 0L, 150L)),
+ Collections.emptyList(),
+ Collections.singletonList(
+ createGlobalIndexEntry("idx", ADD, partition0,
0, 0, 149)),
+ null,
+ Snapshot.CommitKind.APPEND);
+
+ assertThat(exception).isPresent();
+
+ exception =
+ detection.checkConflicts(
+ snapshot(1),
+ Collections.singletonList(
+ createFileEntryWithRowId("f1", ADD,
partition0, 1, 0L, 150L)),
+ Collections.emptyList(),
+ Collections.singletonList(
+ createGlobalIndexEntry("idx", ADD, partition0,
0, 0, 149)),
+ null,
+ Snapshot.CommitKind.APPEND);
+
+ assertThat(exception).isPresent();
+ }
+
+ @Test
+ void testCheckGlobalIndexRowIdExistenceSkipsDeleteIndexEntry() {
+ ConflictDetection detection = createConflictDetection();
+
+ Optional<RuntimeException> exception =
+ detection.checkConflicts(
+ snapshot(1),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(
+ createGlobalIndexEntry("idx", DELETE,
BinaryRow.EMPTY_ROW, 0, 149)),
+ null,
+ Snapshot.CommitKind.APPEND);
+
+ assertThat(exception).isNotPresent();
+ }
+
private ConflictDetection createConflictDetection() {
return new ConflictDetection(
"test-table",
@@ -504,4 +639,28 @@ class ConflictDetectionTest {
null,
null);
}
+
+ private Snapshot snapshot(long id) {
+ return new Snapshot(
+ id,
+ 0,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ "commit-user",
+ id,
+ Snapshot.CommitKind.APPEND,
+ id,
+ 0,
+ 0,
+ null,
+ null,
+ null,
+ null,
+ null);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java
index 48ea935f5b..415c1590a5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.globalindex;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.partition.PartitionPredicate;
@@ -72,7 +73,17 @@ public class GenericGlobalIndexBuilder implements
Serializable {
+ "deleted rows to be indexed.",
table.name());
- return
table.store().newScan().withPartitionFilter(partitionPredicate).plan().files();
+ Snapshot snapshot = table.snapshotManager().latestSnapshot();
+ if (snapshot == null) {
+ return Collections.emptyList();
+ }
+
+ return table.store()
+ .newScan()
+ .withSnapshot(snapshot)
+ .withPartitionFilter(partitionPredicate)
+ .plan()
+ .files();
}
/** Returns old index file entries that should be deleted after new
indexes are built. */