This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 94b468a2b9 [core] Drop unsafe global indexes during row-id reassign 
(#8166)
94b468a2b9 is described below

commit 94b468a2b93276a7aa73646658b39552cc6083a7
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jun 9 19:37:21 2026 +0800

    [core] Drop unsafe global indexes during row-id reassign (#8166)
    
    This PR updates metadata-only row-id reassignment to drop individual
    global index entries whose row ranges cannot be safely rewritten,
    instead of failing the whole row-id reassignment.
    
    When a global index entry's row range is not fully covered by the
    data-file row-id mapping, keeping the old entry would be unsafe because
    the index file still stores row IDs relative to the old range. Dropping
    only that entry lets row-id reassignment proceed while allowing the
    missing global index range to be rebuilt later.
---
 .../DataEvolutionRowIdReassigner.java              |  16 ++-
 .../append/dataevolution/RowRangeMappingIndex.java |  26 +++--
 .../DataEvolutionRowIdReassignerTest.java          | 118 +++++++++++++++++++++
 .../dataevolution/RowRangeMappingIndexTest.java    |  11 +-
 4 files changed, 147 insertions(+), 24 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java
index 1faa4cf66a..b7261639b5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java
@@ -56,6 +56,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
@@ -665,12 +666,21 @@ public class DataEvolutionRowIdReassigner {
                 continue;
             }
 
+            Optional<Range> newRange = 
mappingIndex.map(globalIndex.rowRange());
+            if (!newRange.isPresent()) {
+                LOG.warn(
+                        "Drop global index file '{}' from table {} during 
row-id reassignment because its row range {} cannot be rewritten safely.",
+                        indexFile.fileName(),
+                        table.name(),
+                        globalIndex.rowRange());
+                continue;
+            }
+            Range rewrittenRange = newRange.get();
             globalIndexFileCount++;
-            Range newRange = mappingIndex.map(globalIndex.rowRange());
             GlobalIndexMeta newGlobalIndex =
                     new GlobalIndexMeta(
-                            newRange.from,
-                            newRange.to,
+                            rewrittenRange.from,
+                            rewrittenRange.to,
                             globalIndex.indexFieldId(),
                             globalIndex.extraFieldIds(),
                             globalIndex.indexMeta());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java
index d234c6f736..a24515c905 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java
@@ -24,9 +24,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** Index for row-range mappings. */
 final class RowRangeMappingIndex {
@@ -70,13 +70,14 @@ final class RowRangeMappingIndex {
         return new Mapping(oldStart, oldEnd, newStart);
     }
 
-    Range map(Range oldRange) {
+    Optional<Range> map(Range oldRange) {
         checkArgument(oldRange != null, "Old row range cannot be null.");
         checkArgument(oldRange.from <= oldRange.to, "Invalid old row range 
%s.", oldRange);
 
         long cursor = oldRange.from;
-        Long newFrom = null;
+        long newFrom = Long.MIN_VALUE;
         long newTo = Long.MIN_VALUE;
+        boolean mapped = false;
 
         for (int i = lowerBound(oldEnds, cursor); i < mappings.size(); i++) {
             Mapping mapping = mappings.get(i);
@@ -88,13 +89,11 @@ final class RowRangeMappingIndex {
             long segmentNewFrom = mapping.newStart + cursor - mapping.oldStart;
             long segmentNewTo = mapping.newStart + segmentTo - 
mapping.oldStart;
 
-            if (newFrom == null) {
+            if (!mapped) {
                 newFrom = segmentNewFrom;
-            } else {
-                checkState(
-                        newTo + 1 == segmentNewFrom,
-                        "Global index row range %s maps to non-contiguous new 
row range.",
-                        oldRange);
+                mapped = true;
+            } else if (newTo + 1 != segmentNewFrom) {
+                return Optional.empty();
             }
             newTo = segmentNewTo;
             cursor = segmentTo + 1;
@@ -103,11 +102,10 @@ final class RowRangeMappingIndex {
             }
         }
 
-        checkState(
-                cursor > oldRange.to && newFrom != null,
-                "Global index row range %s is not fully covered by data file 
row-id mappings.",
-                oldRange);
-        return new Range(newFrom, newTo);
+        if (cursor <= oldRange.to) {
+            return Optional.empty();
+        }
+        return Optional.of(new Range(newFrom, newTo));
     }
 
     private static int lowerBound(long[] sorted, long target) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java
index 2cacf4722e..ffaade20cb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java
@@ -29,11 +29,13 @@ import 
org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
 import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
 import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileEntry;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
@@ -55,6 +57,7 @@ import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.SnapshotManager;
 
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -385,6 +388,47 @@ public class DataEvolutionRowIdReassignerTest extends 
TableTestBase {
         assertThat(readPayloads(table, predicate)).containsExactly("v4");
     }
 
+    @Test
+    public void 
testDropUnsafeGlobalIndexEntryWhenRangeCannotBeRewrittenByMetadataOnly()
+            throws Exception {
+        FileStoreTable table = createTableWithInterleavedPartitions();
+        createBTreeIndex(table);
+        replaceGlobalIndexRangesWithPartitionSpanningRanges(table);
+        Snapshot before = table.snapshotManager().latestSnapshot();
+
+        assertThat(globalIndexRanges(table))
+                .containsExactly(
+                        new Range(0, 4),
+                        new Range(0, 4),
+                        new Range(0, 4),
+                        new Range(1, 3),
+                        new Range(1, 3));
+
+        DataEvolutionRowIdReassigner.Result result =
+                new DataEvolutionRowIdReassigner(table)
+                        .reassign("test-drop-unsafe-sparse-index-range-entry");
+
+        assertThat(result.reassigned).isTrue();
+        assertThat(result.skipReason).isNull();
+        assertThat(result.previousSnapshotId).isEqualTo(before.id());
+        assertThat(result.newSnapshotId).isEqualTo(before.id() + 1);
+        assertThat(result.firstAssignedRowId).isEqualTo(5L);
+        assertThat(result.nextRowId).isEqualTo(10L);
+        assertThat(result.fileCount).isEqualTo(5L);
+        assertThat(result.rowCount).isEqualTo(5L);
+        assertThat(result.indexFileCount).isEqualTo(0L);
+
+        Map<String, List<Long>> rowIdsByPartition = rowIdsByPartition(table);
+        assertThat(rowIdsByPartition).hasSize(2);
+        assertThat(rowIdsByPartition).containsEntry("pt=a/", Arrays.asList(5L, 
6L, 7L));
+        assertThat(rowIdsByPartition).containsEntry("pt=b/", Arrays.asList(8L, 
9L));
+        assertThat(globalIndexRanges(table)).isEmpty();
+
+        Predicate predicate =
+                new 
PredicateBuilder(table.rowType()).equal(table.rowType().getFieldIndex("id"), 4);
+        assertThat(readPayloads(table, predicate)).containsExactly("v4");
+    }
+
     @Test
     public void testReassignManyOutOfOrderPartitionEntries() throws Exception {
         verifyReassignOutOfOrderPartitionEntries(LARGE_ENTRY_COUNT, 
LARGE_MANIFEST_FILE_COUNT);
@@ -883,6 +927,80 @@ public class DataEvolutionRowIdReassignerTest extends 
TableTestBase {
         }
     }
 
+    private void 
replaceGlobalIndexRangesWithPartitionSpanningRanges(FileStoreTable table)
+            throws Exception {
+        Snapshot latest = table.snapshotManager().latestSnapshot();
+        IndexManifestFile indexManifestFile = 
table.store().indexManifestFileFactory().create();
+        List<IndexManifestEntry> rewritten = new ArrayList<>();
+        for (IndexManifestEntry entry : 
indexManifestFile.read(latest.indexManifest())) {
+            GlobalIndexMeta globalIndex = entry.indexFile().globalIndexMeta();
+            assertThat(globalIndex).isNotNull();
+
+            Range staleRowRange;
+            String partition = 
table.store().pathFactory().getPartitionString(entry.partition());
+            if (partition.equals("pt=a/")) {
+                staleRowRange = new Range(0, 4);
+            } else if (partition.equals("pt=b/")) {
+                staleRowRange = new Range(1, 3);
+            } else {
+                throw new IllegalStateException("Unexpected partition " + 
partition);
+            }
+
+            GlobalIndexMeta staleGlobalIndex =
+                    new GlobalIndexMeta(
+                            staleRowRange.from,
+                            staleRowRange.to,
+                            globalIndex.indexFieldId(),
+                            globalIndex.extraFieldIds(),
+                            globalIndex.indexMeta());
+            IndexFileMeta indexFile = entry.indexFile();
+            rewritten.add(
+                    new IndexManifestEntry(
+                            entry.kind(),
+                            entry.partition(),
+                            entry.bucket(),
+                            new IndexFileMeta(
+                                    indexFile.indexType(),
+                                    indexFile.fileName(),
+                                    indexFile.fileSize(),
+                                    indexFile.rowCount(),
+                                    indexFile.dvRanges(),
+                                    indexFile.externalPath(),
+                                    staleGlobalIndex)));
+        }
+
+        String staleIndexManifest = 
indexManifestFile.writeWithoutRolling(rewritten);
+        Snapshot staleSnapshot =
+                new Snapshot(
+                        latest.version(),
+                        latest.id(),
+                        latest.schemaId(),
+                        latest.baseManifestList(),
+                        latest.baseManifestListSize(),
+                        latest.deltaManifestList(),
+                        latest.deltaManifestListSize(),
+                        latest.changelogManifestList(),
+                        latest.changelogManifestListSize(),
+                        staleIndexManifest,
+                        latest.commitUser(),
+                        latest.commitIdentifier(),
+                        latest.commitKind(),
+                        latest.timeMillis(),
+                        latest.totalRecordCount(),
+                        latest.deltaRecordCount(),
+                        latest.changelogRecordCount(),
+                        latest.watermark(),
+                        latest.statistics(),
+                        latest.properties(),
+                        latest.nextRowId());
+        SnapshotManager snapshotManager = table.snapshotManager();
+        snapshotManager
+                .fileIO()
+                .overwriteFileUtf8(
+                        snapshotManager.snapshotPath(latest.id()), 
staleSnapshot.toJson());
+        snapshotManager.invalidateCache();
+    }
+
     private List<Range> globalIndexRanges(FileStoreTable table) {
         List<Range> ranges = new ArrayList<>();
         List<IndexManifestEntry> entries =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java
index 7381e9078b..baeae7d34e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java
@@ -25,7 +25,6 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link RowRangeMappingIndex}. */
 public class RowRangeMappingIndexTest {
@@ -36,7 +35,7 @@ public class RowRangeMappingIndexTest {
                 RowRangeMappingIndex.create(
                         Arrays.asList(RowRangeMappingIndex.mapping(10, 19, 
100)));
 
-        assertThat(index.map(new Range(12, 15))).isEqualTo(new Range(102, 
105));
+        assertThat(index.map(new Range(12, 15))).hasValue(new Range(102, 105));
     }
 
     @Test
@@ -48,7 +47,7 @@ public class RowRangeMappingIndexTest {
                                 RowRangeMappingIndex.mapping(15, 19, 105),
                                 RowRangeMappingIndex.mapping(20, 24, 110)));
 
-        assertThat(index.map(new Range(12, 22))).isEqualTo(new Range(102, 
112));
+        assertThat(index.map(new Range(12, 22))).hasValue(new Range(102, 112));
     }
 
     @Test
@@ -59,8 +58,7 @@ public class RowRangeMappingIndexTest {
                                 RowRangeMappingIndex.mapping(10, 14, 100),
                                 RowRangeMappingIndex.mapping(20, 24, 105)));
 
-        assertThatThrownBy(() -> index.map(new Range(12, 22)))
-                .hasMessageContaining("is not fully covered");
+        assertThat(index.map(new Range(12, 22))).isEmpty();
     }
 
     @Test
@@ -71,7 +69,6 @@ public class RowRangeMappingIndexTest {
                                 RowRangeMappingIndex.mapping(10, 14, 100),
                                 RowRangeMappingIndex.mapping(15, 19, 200)));
 
-        assertThatThrownBy(() -> index.map(new Range(12, 17)))
-                .hasMessageContaining("maps to non-contiguous new row range");
+        assertThat(index.map(new Range(12, 17))).isEmpty();
     }
 }

Reply via email to