This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 27d87ab9c0 [core] Fix data evolution index split row ranges (#7953)
27d87ab9c0 is described below
commit 27d87ab9c0d62d4c78ab90b74cf0867d26afdfe1
Author: YeJunHao <[email protected]>
AuthorDate: Mon May 25 17:46:59 2026 +0800
[core] Fix data evolution index split row ranges (#7953)
Fix BTree global index building for data-evolution append tables when a
data split contains unordered data files or intersects multiple
discontiguous row ranges.
Previously `DataEvolutionBatchScan.wrap` derived the split range from
the first and last data files. This assumes `dataFiles()` are ordered by
row id, but `RangeHelper.mergeOverlappingRanges` may keep the original
order inside a merged group. As a result, `wrap` could calculate an
inverted range and fail to find any intersected row ranges.
Also, BTree index building assumed each split maps to exactly one row
range after applying the row range index. A contiguous data-file split
can still intersect multiple discontiguous indexed/non-indexed row
ranges, so the builder needs to fan out the split by row range before
grouping.
---
.../paimon/globalindex/DataEvolutionBatchScan.java | 14 +++---
.../globalindex/btree/BTreeGlobalIndexBuilder.java | 57 +++++++++++++---------
.../globalindex/DataEvolutionBatchScanTest.java | 29 +++++++++++
.../btree/BTreeGlobalIndexBuilderSplitTest.java | 39 +++++++++++++++
.../paimon/flink/btree/BTreeIndexTopoBuilder.java | 6 +--
5 files changed, 112 insertions(+), 33 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
index 0542b7d95f..51fa3520d7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
@@ -295,14 +295,16 @@ public class DataEvolutionBatchScan implements
DataTableScan {
private static IndexedSplit wrap(
DataSplit dataSplit, final RowRangeIndex rowRangeIndex,
ScoreGetter scoreGetter) {
List<DataFileMeta> files = dataSplit.dataFiles();
- long min = files.get(0).nonNullFirstRowId();
- long max =
- files.get(files.size() - 1).nonNullFirstRowId()
- + files.get(files.size() - 1).rowCount()
- - 1;
- List<Range> expected = rowRangeIndex.intersectedRanges(min, max);
+ List<Range> expected = new ArrayList<>();
+ for (DataFileMeta file : files) {
+ Range fileRange = file.nonNullRowIdRange();
+ expected.addAll(rowRangeIndex.intersectedRanges(fileRange.from,
fileRange.to));
+ }
+ expected = Range.sortAndMergeOverlap(expected, true);
if (expected.isEmpty()) {
+ long min = files.stream().mapToLong(f ->
f.nonNullRowIdRange().from).min().orElse(-1L);
+ long max = files.stream().mapToLong(f ->
f.nonNullRowIdRange().to).max().orElse(-1L);
throw new IllegalStateException(
String.format(
"This is a bug, there should be intersected ranges
for split with min row id %d and max row id %d.",
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
index 577a04642f..9f143d6cf6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -62,7 +62,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -305,23 +304,32 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
partition, 0, null, dataIncrement,
CompactIncrement.emptyIncrement());
}
- public static Pair<Range, Split> calcRowRangeWithRowIndex(
+ public static List<Pair<Range, Split>> splitByRowRangeIndex(
RowRangeIndex rowRangeIndex, DataSplit dataSplit) {
- if (rowRangeIndex != null) {
- IndexedSplit indexedSplit =
- (IndexedSplit)
- DataEvolutionBatchScan.wrapToIndexSplits(
- Arrays.asList(dataSplit),
rowRangeIndex, null)
- .splits()
- .get(0);
- checkArgument(
- indexedSplit.rowRanges().size() == 1,
- "Expected exactly one row range for the split, but found:
%s",
- indexedSplit.rowRanges());
- return Pair.of(indexedSplit.rowRanges().get(0), indexedSplit);
+ if (rowRangeIndex == null) {
+ Range range = calcRowRange(dataSplit);
+ return range == null
+ ? Collections.emptyList()
+ : Collections.singletonList(Pair.of(range, dataSplit));
}
- return Pair.of(calcRowRange(dataSplit), dataSplit);
+ List<Pair<Range, Split>> result = new ArrayList<>();
+ for (Split split :
+ DataEvolutionBatchScan.wrapToIndexSplits(
+ Collections.singletonList(dataSplit),
rowRangeIndex, null)
+ .splits()) {
+ IndexedSplit indexedSplit = (IndexedSplit) split;
+ for (Range rowRange : indexedSplit.rowRanges()) {
+ result.add(
+ Pair.of(
+ rowRange,
+ new IndexedSplit(
+ indexedSplit.dataSplit(),
+ Collections.singletonList(rowRange),
+ null)));
+ }
+ }
+ return result;
}
public static Range calcRowRange(DataSplit dataSplit) {
@@ -354,16 +362,17 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
RowRangeIndex rowRangeIndex, List<DataSplit> splits) {
Map<BinaryRow, List<Pair<Range, Split>>> partitionSplitRanges = new
HashMap<>();
for (DataSplit split : splits) {
- Pair<Range, Split> keyPair =
calcRowRangeWithRowIndex(rowRangeIndex, split);
- Range splitRange = keyPair.getKey();
- Split splitWithRange = keyPair.getValue();
- if (splitRange == null) {
- continue;
+ for (Pair<Range, Split> keyPair :
splitByRowRangeIndex(rowRangeIndex, split)) {
+ Range splitRange = keyPair.getKey();
+ Split splitWithRange = keyPair.getValue();
+ if (splitRange == null) {
+ continue;
+ }
+ BinaryRow partition = split.partition();
+ partitionSplitRanges
+ .computeIfAbsent(partition, p -> new ArrayList<>())
+ .add(Pair.of(splitRange, splitWithRange));
}
- BinaryRow partition = split.partition();
- partitionSplitRanges
- .computeIfAbsent(partition, p -> new ArrayList<>())
- .add(Pair.of(splitRange, splitWithRange));
}
Map<BinaryRow, Map<Range, List<Split>>> result = new HashMap<>();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/globalindex/DataEvolutionBatchScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/globalindex/DataEvolutionBatchScanTest.java
index 1bd15f7197..30744ae542 100644
---
a/paimon-core/src/test/java/org/apache/paimon/globalindex/DataEvolutionBatchScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/globalindex/DataEvolutionBatchScanTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.utils.RowRangeIndex;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -108,6 +109,34 @@ public class DataEvolutionBatchScanTest {
}
}
+ @Test
+ public void testWrapToIndexSplitsWithUnorderedAndDiscontiguousDataFiles() {
+ DataFileMeta file1 = newAppendFile(4650L, 51L, "file-1");
+ DataFileMeta file2 = newAppendFile(4300L, 151L, "file-2");
+ DataFileMeta file3 = newAppendFile(4200L, 208L, "file-3");
+ DataSplit split =
+ DataSplit.builder()
+ .withSnapshot(1L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(0)
+ .withBucketPath("bucket-0")
+ .withDataFiles(Arrays.asList(file1, file2, file3))
+ .build();
+
+ List<Split> indexedSplits =
+ DataEvolutionBatchScan.wrapToIndexSplits(
+ Collections.singletonList(split),
+
RowRangeIndex.create(Collections.singletonList(new Range(0, 5000))),
+ null)
+ .splits();
+
+ assertThat(indexedSplits).hasSize(1);
+ IndexedSplit indexedSplit = (IndexedSplit) indexedSplits.get(0);
+ assertThat(indexedSplit.dataSplit()).isEqualTo(split);
+ assertThat(indexedSplit.rowRanges())
+ .containsExactly(new Range(4200, 4450), new Range(4650, 4700));
+ }
+
private static List<Range> expectedRanges(long min, long max, List<Range>
rowRanges) {
List<Range> expected = new ArrayList<>();
for (Range range : rowRanges) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderSplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderSplitTest.java
index 3ea2809e1c..60431faa12 100644
---
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderSplitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderSplitTest.java
@@ -19,17 +19,21 @@
package org.apache.paimon.globalindex.btree;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.globalindex.IndexedSplit;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.PojoDataFileMeta;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@@ -65,6 +69,41 @@ public class BTreeGlobalIndexBuilderSplitTest {
.isEqualTo(new Range(300, 399));
}
+ @Test
+ public void testGroupSplitsByDiscontiguousRowRangeIndex() {
+ DataFileMeta file1 = createDataFileMeta(4750L, 151L);
+ DataFileMeta file2 = createDataFileMeta(4901L, 1037L);
+ DataFileMeta file3 = createDataFileMeta(5938L, 1662L);
+ DataSplit split =
+ DataSplit.builder()
+ .withSnapshot(1L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(0)
+ .withBucketPath("bucket-0")
+ .withDataFiles(Arrays.asList(file1, file2, file3))
+ .isStreaming(false)
+ .rawConvertible(false)
+ .build();
+
+ Map<BinaryRow, Map<Range, List<Split>>> result =
+ BTreeGlobalIndexBuilder.groupSplitsByRange(
+ RowRangeIndex.create(
+ Arrays.asList(new Range(4750, 4900), new
Range(5938, 7599))),
+ Collections.singletonList(split));
+
+ assertThat(result).containsOnlyKeys(BinaryRow.EMPTY_ROW);
+ Map<Range, List<Split>> ranges = result.get(BinaryRow.EMPTY_ROW);
+ assertThat(ranges).containsOnlyKeys(new Range(4750, 4900), new
Range(5938, 7599));
+ assertIndexedSplitRowRanges(ranges.get(new Range(4750, 4900)), new
Range(4750, 4900));
+ assertIndexedSplitRowRanges(ranges.get(new Range(5938, 7599)), new
Range(5938, 7599));
+ }
+
+ private static void assertIndexedSplitRowRanges(List<Split> splits, Range
rowRange) {
+ assertThat(splits).hasSize(1);
+ assertThat(splits.get(0)).isInstanceOf(IndexedSplit.class);
+ assertThat(((IndexedSplit)
splits.get(0)).rowRanges()).containsExactly(rowRange);
+ }
+
private static DataFileMeta createDataFileMeta(long firstRowId, long
rowCount) {
return new PojoDataFileMeta(
"test-file-" + UUID.randomUUID(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
index e3cd198900..a0db5faa1a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
@@ -105,18 +105,18 @@ public class BTreeIndexTopoBuilder {
Optional<Pair<RowRangeIndex, List<DataSplit>>> indexRangeAndSplits
=
indexBuilder.scan();
if (!indexRangeAndSplits.isPresent()) {
- return false;
+ continue;
}
Pair<RowRangeIndex, List<DataSplit>> scanResult =
indexRangeAndSplits.get();
List<DataSplit> splits =
splitByContiguousRowRange(scanResult.getRight());
if (splits.isEmpty()) {
- return false;
+ continue;
}
Map<BinaryRow, Map<Range, List<Split>>> partitionRangeSplits =
groupSplitsByRange(scanResult.getLeft(), splits);
if (partitionRangeSplits.isEmpty()) {
- return false;
+ continue;
}
// 2. Select necessary columns (index field + ROW_ID)