This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 94b468a2b9 [core] Drop unsafe global indexes during row-id reassign
(#8166)
94b468a2b9 is described below
commit 94b468a2b93276a7aa73646658b39552cc6083a7
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jun 9 19:37:21 2026 +0800
[core] Drop unsafe global indexes during row-id reassign (#8166)
This PR updates metadata-only row-id reassignment to drop individual
global index entries whose row ranges cannot be safely rewritten,
instead of failing the whole row-id reassignment.
When a global index entry's row range is not fully covered by the
data-file row-id mapping, keeping the old entry would be unsafe because
the index file still stores row IDs relative to the old range. Dropping
only that entry lets row-id reassignment proceed while allowing the
missing global index range to be rebuilt later.
---
.../DataEvolutionRowIdReassigner.java | 16 ++-
.../append/dataevolution/RowRangeMappingIndex.java | 26 +++--
.../DataEvolutionRowIdReassignerTest.java | 118 +++++++++++++++++++++
.../dataevolution/RowRangeMappingIndexTest.java | 11 +-
4 files changed, 147 insertions(+), 24 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java
index 1faa4cf66a..b7261639b5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java
@@ -56,6 +56,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
@@ -665,12 +666,21 @@ public class DataEvolutionRowIdReassigner {
continue;
}
+ Optional<Range> newRange =
mappingIndex.map(globalIndex.rowRange());
+ if (!newRange.isPresent()) {
+ LOG.warn(
+ "Drop global index file '{}' from table {} during
row-id reassignment because its row range {} cannot be rewritten safely.",
+ indexFile.fileName(),
+ table.name(),
+ globalIndex.rowRange());
+ continue;
+ }
+ Range rewrittenRange = newRange.get();
globalIndexFileCount++;
- Range newRange = mappingIndex.map(globalIndex.rowRange());
GlobalIndexMeta newGlobalIndex =
new GlobalIndexMeta(
- newRange.from,
- newRange.to,
+ rewrittenRange.from,
+ rewrittenRange.to,
globalIndex.indexFieldId(),
globalIndex.extraFieldIds(),
globalIndex.indexMeta());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java
index d234c6f736..a24515c905 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java
@@ -24,9 +24,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Optional;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkState;
/** Index for row-range mappings. */
final class RowRangeMappingIndex {
@@ -70,13 +70,14 @@ final class RowRangeMappingIndex {
return new Mapping(oldStart, oldEnd, newStart);
}
- Range map(Range oldRange) {
+ Optional<Range> map(Range oldRange) {
checkArgument(oldRange != null, "Old row range cannot be null.");
checkArgument(oldRange.from <= oldRange.to, "Invalid old row range
%s.", oldRange);
long cursor = oldRange.from;
- Long newFrom = null;
+ long newFrom = Long.MIN_VALUE;
long newTo = Long.MIN_VALUE;
+ boolean mapped = false;
for (int i = lowerBound(oldEnds, cursor); i < mappings.size(); i++) {
Mapping mapping = mappings.get(i);
@@ -88,13 +89,11 @@ final class RowRangeMappingIndex {
long segmentNewFrom = mapping.newStart + cursor - mapping.oldStart;
long segmentNewTo = mapping.newStart + segmentTo -
mapping.oldStart;
- if (newFrom == null) {
+ if (!mapped) {
newFrom = segmentNewFrom;
- } else {
- checkState(
- newTo + 1 == segmentNewFrom,
- "Global index row range %s maps to non-contiguous new
row range.",
- oldRange);
+ mapped = true;
+ } else if (newTo + 1 != segmentNewFrom) {
+ return Optional.empty();
}
newTo = segmentNewTo;
cursor = segmentTo + 1;
@@ -103,11 +102,10 @@ final class RowRangeMappingIndex {
}
}
- checkState(
- cursor > oldRange.to && newFrom != null,
- "Global index row range %s is not fully covered by data file
row-id mappings.",
- oldRange);
- return new Range(newFrom, newTo);
+ if (cursor <= oldRange.to) {
+ return Optional.empty();
+ }
+ return Optional.of(new Range(newFrom, newTo));
}
private static int lowerBound(long[] sorted, long target) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java
index 2cacf4722e..ffaade20cb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java
@@ -29,11 +29,13 @@ import
org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
@@ -55,6 +57,7 @@ import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.SnapshotManager;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -385,6 +388,47 @@ public class DataEvolutionRowIdReassignerTest extends
TableTestBase {
assertThat(readPayloads(table, predicate)).containsExactly("v4");
}
+ @Test
+ public void
testDropUnsafeGlobalIndexEntryWhenRangeCannotBeRewrittenByMetadataOnly()
+ throws Exception {
+ FileStoreTable table = createTableWithInterleavedPartitions();
+ createBTreeIndex(table);
+ replaceGlobalIndexRangesWithPartitionSpanningRanges(table);
+ Snapshot before = table.snapshotManager().latestSnapshot();
+
+ assertThat(globalIndexRanges(table))
+ .containsExactly(
+ new Range(0, 4),
+ new Range(0, 4),
+ new Range(0, 4),
+ new Range(1, 3),
+ new Range(1, 3));
+
+ DataEvolutionRowIdReassigner.Result result =
+ new DataEvolutionRowIdReassigner(table)
+ .reassign("test-drop-unsafe-sparse-index-range-entry");
+
+ assertThat(result.reassigned).isTrue();
+ assertThat(result.skipReason).isNull();
+ assertThat(result.previousSnapshotId).isEqualTo(before.id());
+ assertThat(result.newSnapshotId).isEqualTo(before.id() + 1);
+ assertThat(result.firstAssignedRowId).isEqualTo(5L);
+ assertThat(result.nextRowId).isEqualTo(10L);
+ assertThat(result.fileCount).isEqualTo(5L);
+ assertThat(result.rowCount).isEqualTo(5L);
+ assertThat(result.indexFileCount).isEqualTo(0L);
+
+ Map<String, List<Long>> rowIdsByPartition = rowIdsByPartition(table);
+ assertThat(rowIdsByPartition).hasSize(2);
+ assertThat(rowIdsByPartition).containsEntry("pt=a/", Arrays.asList(5L,
6L, 7L));
+ assertThat(rowIdsByPartition).containsEntry("pt=b/", Arrays.asList(8L,
9L));
+ assertThat(globalIndexRanges(table)).isEmpty();
+
+ Predicate predicate =
+ new
PredicateBuilder(table.rowType()).equal(table.rowType().getFieldIndex("id"), 4);
+ assertThat(readPayloads(table, predicate)).containsExactly("v4");
+ }
+
@Test
public void testReassignManyOutOfOrderPartitionEntries() throws Exception {
verifyReassignOutOfOrderPartitionEntries(LARGE_ENTRY_COUNT,
LARGE_MANIFEST_FILE_COUNT);
@@ -883,6 +927,80 @@ public class DataEvolutionRowIdReassignerTest extends
TableTestBase {
}
}
+ private void
replaceGlobalIndexRangesWithPartitionSpanningRanges(FileStoreTable table)
+ throws Exception {
+ Snapshot latest = table.snapshotManager().latestSnapshot();
+ IndexManifestFile indexManifestFile =
table.store().indexManifestFileFactory().create();
+ List<IndexManifestEntry> rewritten = new ArrayList<>();
+ for (IndexManifestEntry entry :
indexManifestFile.read(latest.indexManifest())) {
+ GlobalIndexMeta globalIndex = entry.indexFile().globalIndexMeta();
+ assertThat(globalIndex).isNotNull();
+
+ Range staleRowRange;
+ String partition =
table.store().pathFactory().getPartitionString(entry.partition());
+ if (partition.equals("pt=a/")) {
+ staleRowRange = new Range(0, 4);
+ } else if (partition.equals("pt=b/")) {
+ staleRowRange = new Range(1, 3);
+ } else {
+ throw new IllegalStateException("Unexpected partition " +
partition);
+ }
+
+ GlobalIndexMeta staleGlobalIndex =
+ new GlobalIndexMeta(
+ staleRowRange.from,
+ staleRowRange.to,
+ globalIndex.indexFieldId(),
+ globalIndex.extraFieldIds(),
+ globalIndex.indexMeta());
+ IndexFileMeta indexFile = entry.indexFile();
+ rewritten.add(
+ new IndexManifestEntry(
+ entry.kind(),
+ entry.partition(),
+ entry.bucket(),
+ new IndexFileMeta(
+ indexFile.indexType(),
+ indexFile.fileName(),
+ indexFile.fileSize(),
+ indexFile.rowCount(),
+ indexFile.dvRanges(),
+ indexFile.externalPath(),
+ staleGlobalIndex)));
+ }
+
+ String staleIndexManifest =
indexManifestFile.writeWithoutRolling(rewritten);
+ Snapshot staleSnapshot =
+ new Snapshot(
+ latest.version(),
+ latest.id(),
+ latest.schemaId(),
+ latest.baseManifestList(),
+ latest.baseManifestListSize(),
+ latest.deltaManifestList(),
+ latest.deltaManifestListSize(),
+ latest.changelogManifestList(),
+ latest.changelogManifestListSize(),
+ staleIndexManifest,
+ latest.commitUser(),
+ latest.commitIdentifier(),
+ latest.commitKind(),
+ latest.timeMillis(),
+ latest.totalRecordCount(),
+ latest.deltaRecordCount(),
+ latest.changelogRecordCount(),
+ latest.watermark(),
+ latest.statistics(),
+ latest.properties(),
+ latest.nextRowId());
+ SnapshotManager snapshotManager = table.snapshotManager();
+ snapshotManager
+ .fileIO()
+ .overwriteFileUtf8(
+ snapshotManager.snapshotPath(latest.id()),
staleSnapshot.toJson());
+ snapshotManager.invalidateCache();
+ }
+
private List<Range> globalIndexRanges(FileStoreTable table) {
List<Range> ranges = new ArrayList<>();
List<IndexManifestEntry> entries =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java
index 7381e9078b..baeae7d34e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java
@@ -25,7 +25,6 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link RowRangeMappingIndex}. */
public class RowRangeMappingIndexTest {
@@ -36,7 +35,7 @@ public class RowRangeMappingIndexTest {
RowRangeMappingIndex.create(
Arrays.asList(RowRangeMappingIndex.mapping(10, 19,
100)));
- assertThat(index.map(new Range(12, 15))).isEqualTo(new Range(102,
105));
+ assertThat(index.map(new Range(12, 15))).hasValue(new Range(102, 105));
}
@Test
@@ -48,7 +47,7 @@ public class RowRangeMappingIndexTest {
RowRangeMappingIndex.mapping(15, 19, 105),
RowRangeMappingIndex.mapping(20, 24, 110)));
- assertThat(index.map(new Range(12, 22))).isEqualTo(new Range(102,
112));
+ assertThat(index.map(new Range(12, 22))).hasValue(new Range(102, 112));
}
@Test
@@ -59,8 +58,7 @@ public class RowRangeMappingIndexTest {
RowRangeMappingIndex.mapping(10, 14, 100),
RowRangeMappingIndex.mapping(20, 24, 105)));
- assertThatThrownBy(() -> index.map(new Range(12, 22)))
- .hasMessageContaining("is not fully covered");
+ assertThat(index.map(new Range(12, 22))).isEmpty();
}
@Test
@@ -71,7 +69,6 @@ public class RowRangeMappingIndexTest {
RowRangeMappingIndex.mapping(10, 14, 100),
RowRangeMappingIndex.mapping(15, 19, 200)));
- assertThatThrownBy(() -> index.map(new Range(12, 17)))
- .hasMessageContaining("maps to non-contiguous new row range");
+ assertThat(index.map(new Range(12, 17))).isEmpty();
}
}