This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e751b6c8c1 [core] Optimize data-evolution scan with index (#7305)
e751b6c8c1 is described below
commit e751b6c8c1eb945704cbb163ef987e6dfb3f8d10
Author: YeJunHao <[email protected]>
AuthorDate: Thu Feb 26 22:09:10 2026 +0800
[core] Optimize data-evolution scan with index (#7305)
---
.../org/apache/paimon/utils/RowRangeIndex.java | 105 +++++++++++++++
.../paimon/globalindex/DataEvolutionBatchScan.java | 98 +++++++++-----
.../paimon/operation/AbstractFileStoreScan.java | 24 +++-
.../operation/DataEvolutionFileStoreScan.java | 13 +-
.../org/apache/paimon/operation/FileStoreScan.java | 3 +
.../apache/paimon/operation/ManifestsReader.java | 19 +--
.../paimon/table/format/FormatReadBuilder.java | 6 +
.../paimon/table/source/AbstractDataTableScan.java | 11 +-
.../apache/paimon/table/source/InnerTableScan.java | 5 +
.../apache/paimon/table/source/ReadBuilder.java | 9 ++
.../paimon/table/source/ReadBuilderImpl.java | 17 ++-
.../table/source/snapshot/SnapshotReader.java | 3 +
.../table/source/snapshot/SnapshotReaderImpl.java | 11 +-
.../apache/paimon/table/system/AuditLogTable.java | 7 +
.../globalindex/DataEvolutionBatchScanTest.java | 142 +++++++++++++++++++++
.../operation/DataEvolutionFileStoreScanTest.java | 16 +++
16 files changed, 422 insertions(+), 67 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java
b/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java
new file mode 100644
index 0000000000..a068d04f72
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Index for row ranges. */
+public class RowRangeIndex {
+
+ private final List<Range> ranges;
+ private final long[] starts;
+ private final long[] ends;
+
+ private RowRangeIndex(List<Range> ranges) {
+ this.ranges = ranges;
+ this.starts = new long[ranges.size()];
+ this.ends = new long[ranges.size()];
+ for (int i = 0; i < ranges.size(); i++) {
+ Range range = ranges.get(i);
+ starts[i] = range.from;
+ ends[i] = range.to;
+ }
+ }
+
+ public static RowRangeIndex create(List<Range> ranges) {
+ checkArgument(ranges != null, "Ranges cannot be null");
+ return new RowRangeIndex(Range.sortAndMergeOverlap(ranges, true));
+ }
+
+ public List<Range> ranges() {
+ return Collections.unmodifiableList(ranges);
+ }
+
+ public boolean intersects(long start, long end) {
+ int candidate = lowerBound(ends, start);
+ return candidate < starts.length && starts[candidate] <= end;
+ }
+
+ public List<Range> intersectedRanges(long start, long end) {
+ int left = lowerBound(ends, start);
+ if (left >= ranges.size()) {
+ return Collections.emptyList();
+ }
+
+ int right = lowerBound(ends, end);
+ if (right >= ranges.size()) {
+ right = ranges.size() - 1;
+ }
+
+ if (starts[left] > end) {
+ return Collections.emptyList();
+ }
+
+ List<Range> expected = new ArrayList<>();
+ Range from = ranges.get(left);
+ expected.add(new Range(Math.max(start, from.from), Math.min(end,
from.to)));
+
+ int length = right - left - 1;
+ if (length > 0) {
+ expected.addAll(ranges.subList(left + 1, left + 1 + length));
+ }
+
+ if (right != left) {
+ Range to = ranges.get(right);
+ if (to.from <= end) {
+ expected.add(new Range(Math.max(start, to.from), Math.min(end,
to.to)));
+ }
+ }
+ return expected;
+ }
+
+ private static int lowerBound(long[] sorted, long target) {
+ int left = 0;
+ int right = sorted.length;
+ while (left < right) {
+ int mid = left + (right - left) / 2;
+ if (sorted[mid] < target) {
+ left = mid + 1;
+ } else {
+ right = mid;
+ }
+ }
+ return left;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
index 157502de6c..708d7d874c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
@@ -19,6 +19,7 @@
package org.apache.paimon.globalindex;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
@@ -40,17 +41,21 @@ import
org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.function.Function;
import static
org.apache.paimon.globalindex.GlobalIndexScanBuilder.parallelScan;
import static org.apache.paimon.table.SpecialFields.ROW_ID;
+import static
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
/** Scan for data evolution table. */
public class DataEvolutionBatchScan implements DataTableScan {
@@ -60,7 +65,7 @@ public class DataEvolutionBatchScan implements DataTableScan {
private Predicate filter;
private VectorSearch vectorSearch;
- private List<Range> pushedRowRanges;
+ private RowRangeIndex pushedRowRangeIndex;
private GlobalIndexResult globalIndexResult;
public DataEvolutionBatchScan(FileStoreTable wrapped, DataTableBatchScan
batchScan) {
@@ -197,7 +202,20 @@ public class DataEvolutionBatchScan implements
DataTableScan {
return this;
}
- this.pushedRowRanges = rowRanges;
+ this.pushedRowRangeIndex = RowRangeIndex.create(rowRanges);
+ if (globalIndexResult != null) {
+ throw new IllegalStateException("Cannot push row ranges after
global index eval.");
+ }
+ return this;
+ }
+
+ @Override
+ public InnerTableScan withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+ if (rowRangeIndex == null) {
+ return this;
+ }
+
+ this.pushedRowRangeIndex = rowRangeIndex;
if (globalIndexResult != null) {
throw new IllegalStateException("Cannot push row ranges after
global index eval.");
}
@@ -207,8 +225,9 @@ public class DataEvolutionBatchScan implements
DataTableScan {
// To enable other system computing index result by their own.
public InnerTableScan withGlobalIndexResult(GlobalIndexResult
globalIndexResult) {
this.globalIndexResult = globalIndexResult;
- if (pushedRowRanges != null) {
- throw new IllegalStateException("");
+ if (pushedRowRangeIndex != null) {
+ throw new IllegalStateException(
+ "Can't set global index result after pushing down row
ranges.");
}
return this;
}
@@ -220,26 +239,26 @@ public class DataEvolutionBatchScan implements
DataTableScan {
@Override
public Plan plan() {
- List<Range> rowRanges = this.pushedRowRanges;
+ RowRangeIndex rowRangeIndex = this.pushedRowRangeIndex;
ScoreGetter scoreGetter = null;
- if (rowRanges == null) {
+ if (rowRangeIndex == null) {
Optional<GlobalIndexResult> indexResult = evalGlobalIndex();
if (indexResult.isPresent()) {
GlobalIndexResult result = indexResult.get();
- rowRanges = result.results().toRangeList();
+ rowRangeIndex =
RowRangeIndex.create(result.results().toRangeList());
if (result instanceof ScoredGlobalIndexResult) {
scoreGetter = ((ScoredGlobalIndexResult)
result).scoreGetter();
}
}
}
- if (rowRanges == null) {
+ if (rowRangeIndex == null) {
return batchScan.plan();
}
- List<Split> splits =
batchScan.withRowRanges(rowRanges).plan().splits();
- return wrapToIndexSplits(splits, rowRanges, scoreGetter);
+ List<Split> splits =
batchScan.withRowRangeIndex(rowRangeIndex).plan().splits();
+ return wrapToIndexSplits(splits, rowRangeIndex, scoreGetter);
}
private Optional<GlobalIndexResult> evalGlobalIndex() {
@@ -285,35 +304,48 @@ public class DataEvolutionBatchScan implements
DataTableScan {
return Optional.of(result);
}
- private static Plan wrapToIndexSplits(
- List<Split> splits, List<Range> rowRanges, ScoreGetter
scoreGetter) {
+ @VisibleForTesting
+ static Plan wrapToIndexSplits(
+ List<Split> splits, RowRangeIndex rowRangeIndex, ScoreGetter
scoreGetter) {
List<Split> indexedSplits = new ArrayList<>();
- for (Split split : splits) {
- DataSplit dataSplit = (DataSplit) split;
- List<Range> fileRanges = new ArrayList<>();
- for (DataFileMeta file : dataSplit.dataFiles()) {
- fileRanges.add(file.nonNullRowIdRange());
- }
-
- fileRanges = Range.mergeSortedAsPossible(fileRanges);
+ Function<Split, List<IndexedSplit>> process =
+ split ->
+ Collections.singletonList(
+ wrap((DataSplit) split, rowRangeIndex,
scoreGetter));
+ randomlyExecuteSequentialReturn(process, splits,
null).forEachRemaining(indexedSplits::add);
+ return () -> indexedSplits;
+ }
- List<Range> expected = Range.and(fileRanges, rowRanges);
+ private static IndexedSplit wrap(
+ DataSplit dataSplit, final RowRangeIndex rowRangeIndex,
ScoreGetter scoreGetter) {
+ List<DataFileMeta> files = dataSplit.dataFiles();
+ long min = files.get(0).nonNullFirstRowId();
+ long max =
+ files.get(files.size() - 1).nonNullFirstRowId()
+ + files.get(files.size() - 1).rowCount()
+ - 1;
+
+ List<Range> expected = rowRangeIndex.intersectedRanges(min, max);
+ if (expected.isEmpty()) {
+ throw new IllegalStateException(
+ String.format(
+ "This is a bug, there should be intersected ranges
for split with min row id %d and max row id %d.",
+ min, max));
+ }
- float[] scores = null;
- if (scoreGetter != null) {
- int size = expected.stream().mapToInt(r -> (int)
(r.count())).sum();
- scores = new float[size];
+ float[] scores = null;
+ if (scoreGetter != null) {
+ int size = expected.stream().mapToInt(r -> (int)
(r.count())).sum();
+ scores = new float[size];
- int index = 0;
- for (Range range : expected) {
- for (long i = range.from; i <= range.to; i++) {
- scores[index++] = scoreGetter.score(i);
- }
+ int index = 0;
+ for (Range range : expected) {
+ for (long i = range.from; i <= range.to; i++) {
+ scores[index++] = scoreGetter.score(i);
}
}
-
- indexedSplits.add(new IndexedSplit(dataSplit, expected, scores));
}
- return () -> indexedSplits;
+
+ return new IndexedSplit(dataSplit, expected, scores);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 033b24c746..68ebacaa80 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -44,6 +44,7 @@ import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ListUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
@@ -95,7 +96,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private ScanMetrics scanMetrics = null;
private boolean dropStats;
- @Nullable protected List<Range> rowRanges;
+ @Nullable protected RowRangeIndex rowRangeIndex = null;
@Nullable protected Long limit;
public AbstractFileStoreScan(
@@ -247,8 +248,25 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public FileStoreScan withRowRanges(List<Range> rowRanges) {
- this.rowRanges = rowRanges;
- manifestsReader.withRowRanges(rowRanges);
+ if (rowRanges == null) {
+ this.rowRangeIndex = null;
+ manifestsReader.withRowRangeIndex(null);
+ return this;
+ }
+ this.rowRangeIndex = RowRangeIndex.create(rowRanges);
+ manifestsReader.withRowRangeIndex(this.rowRangeIndex);
+ return this;
+ }
+
+ @Override
+ public FileStoreScan withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+ if (rowRangeIndex == null) {
+ this.rowRangeIndex = null;
+ manifestsReader.withRowRangeIndex(null);
+ return this;
+ }
+ this.rowRangeIndex = rowRangeIndex;
+ manifestsReader.withRowRangeIndex(rowRangeIndex);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 5cc31ed5b0..7ac829be01 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -315,7 +315,7 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
}
// If rowRanges is null, all entries should be kept
- if (this.rowRanges == null) {
+ if (this.rowRangeIndex == null) {
return true;
}
@@ -328,16 +328,7 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
// Check if any value in indices is in the range [firstRowId,
firstRowId + rowCount - 1]
long rowCount = file.rowCount();
long endRowId = firstRowId + rowCount - 1;
- Range fileRowRange = new Range(firstRowId, endRowId);
-
- for (Range expected : rowRanges) {
- if (Range.intersection(fileRowRange, expected) != null) {
- return true;
- }
- }
-
- // No matching indices found, skip this entry
- return false;
+ return rowRangeIndex.intersects(firstRowId, endRowId);
}
/** Statistics for data evolution. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 7542ff87d4..1e044f810f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -34,6 +34,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BiFilter;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import javax.annotation.Nullable;
@@ -91,6 +92,8 @@ public interface FileStoreScan {
FileStoreScan withRowRanges(List<Range> rowRanges);
+ FileStoreScan withRowRangeIndex(RowRangeIndex rowRangeIndex);
+
FileStoreScan withReadType(RowType readType);
FileStoreScan withLimit(long limit);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
index 144121ca6e..e5727cc0c8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -28,7 +28,7 @@ import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BiFilter;
-import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
@@ -55,7 +55,7 @@ public class ManifestsReader {
@Nullable private Integer specifiedLevel = null;
@Nullable private PartitionPredicate partitionFilter = null;
@Nullable private BiFilter<Integer, Integer> levelMinMaxFilter = null;
- @Nullable protected List<Range> rowRanges;
+ @Nullable protected RowRangeIndex rowRangeIndex = null;
public ManifestsReader(
RowType partitionType,
@@ -108,8 +108,8 @@ public class ManifestsReader {
return this;
}
- public ManifestsReader withRowRanges(List<Range> rowRanges) {
- this.rowRanges = rowRanges;
+ public ManifestsReader withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+ this.rowRangeIndex = rowRangeIndex;
return this;
}
@@ -150,7 +150,7 @@ public class ManifestsReader {
}
private boolean filterManifestByRowRanges(ManifestFileMeta manifest) {
- if (rowRanges == null) {
+ if (rowRangeIndex == null) {
return true;
}
Long min = manifest.minRowId();
@@ -159,14 +159,7 @@ public class ManifestsReader {
return true;
}
- Range manifestRowRange = new Range(min, max);
-
- for (Range expected : rowRanges) {
- if (Range.intersection(manifestRowRange, expected) != null) {
- return true;
- }
- }
- return false;
+ return this.rowRangeIndex.intersects(min, max);
}
/** Note: Keep this thread-safe. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index a09ad8f36d..976c95f1f1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -43,6 +43,7 @@ import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import javax.annotation.Nullable;
@@ -242,6 +243,11 @@ public class FormatReadBuilder implements ReadBuilder {
throw new UnsupportedOperationException("Format Table does not support
withRowRanges.");
}
+ @Override
+ public ReadBuilder withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+ throw new UnsupportedOperationException("Format Table does not support
withRowRangeIndex.");
+ }
+
@Override
public ReadBuilder withVectorSearch(VectorSearch vectorSearch) {
throw new UnsupportedOperationException("Format Table does not support
withRowRanges.");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index dbc88937a4..4908a85b32 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -57,6 +57,7 @@ import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -191,8 +192,14 @@ abstract class AbstractDataTableScan implements
DataTableScan {
}
@Override
- public InnerTableScan withRowRanges(List<Range> rowRanges) {
- snapshotReader.withRowRanges(rowRanges);
+ public InnerTableScan withRowRanges(List<Range> sortedPushdownRowRanges) {
+ snapshotReader.withRowRanges(sortedPushdownRowRanges);
+ return this;
+ }
+
+ @Override
+ public InnerTableScan withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+ snapshotReader.withRowRangeIndex(rowRangeIndex);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index bc3d0f275f..1f6edd7176 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -27,6 +27,7 @@ import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import javax.annotation.Nullable;
@@ -74,6 +75,10 @@ public interface InnerTableScan extends TableScan {
return this;
}
+ default InnerTableScan withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+ return this;
+ }
+
default InnerTableScan withBucket(int bucket) {
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index a4889a47a9..8c3cf6f681 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -28,6 +28,7 @@ import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import java.io.Serializable;
import java.util.List;
@@ -159,6 +160,14 @@ public interface ReadBuilder extends Serializable {
*/
ReadBuilder withRowRanges(List<Range> rowRanges);
+ /**
+ * Specify the row range index to be read. This is usually used to read
specific rows in
+ * data-evolution table.
+ *
+ * @param rowRangeIndex the indexed row id ranges to be read
+ */
+ ReadBuilder withRowRangeIndex(RowRangeIndex rowRangeIndex);
+
/**
* Push vector search to the reader.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index f4f529dc4c..f31c750ec1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -28,6 +28,7 @@ import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import javax.annotation.Nullable;
@@ -62,7 +63,7 @@ public class ReadBuilderImpl implements ReadBuilder {
private Filter<Integer> bucketFilter;
private @Nullable RowType readType;
- private @Nullable List<Range> rowRanges;
+ private @Nullable RowRangeIndex rowRangeIndex;
private @Nullable VectorSearch vectorSearch;
private boolean dropStats = false;
@@ -146,7 +147,17 @@ public class ReadBuilderImpl implements ReadBuilder {
@Override
public ReadBuilder withRowRanges(List<Range> indices) {
- this.rowRanges = indices;
+ if (indices == null) {
+ this.rowRangeIndex = null;
+ return this;
+ }
+ this.rowRangeIndex = RowRangeIndex.create(indices);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+ this.rowRangeIndex = rowRangeIndex;
return this;
}
@@ -198,7 +209,7 @@ public class ReadBuilderImpl implements ReadBuilder {
scan.withFilter(filter)
.withReadType(readType)
.withPartitionFilter(partitionFilter)
- .withRowRanges(rowRanges)
+ .withRowRangeIndex(rowRangeIndex)
.withVectorSearch(vectorSearch);
checkState(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 4352b40037..f837daaaec 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -40,6 +40,7 @@ import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
@@ -114,6 +115,8 @@ public interface SnapshotReader {
SnapshotReader withRowRanges(List<Range> rowRanges);
+ SnapshotReader withRowRangeIndex(RowRangeIndex rowRangeIndex);
+
SnapshotReader withReadType(RowType readType);
SnapshotReader withLimit(int limit);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index c839a5dbd1..a6710ff008 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -60,6 +60,7 @@ import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.LazyField;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
@@ -307,8 +308,14 @@ public class SnapshotReaderImpl implements SnapshotReader {
}
@Override
- public SnapshotReader withRowRanges(List<Range> rowRanges) {
- scan.withRowRanges(rowRanges);
+ public SnapshotReader withRowRanges(List<Range> sortedPushdownRowRanges) {
+ scan.withRowRanges(sortedPushdownRowRanges);
+ return this;
+ }
+
+ @Override
+ public SnapshotReader withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+ scan.withRowRangeIndex(rowRangeIndex);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index d613d9455f..980183d801 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -67,6 +67,7 @@ import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import org.apache.paimon.utils.SimpleFileReader;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -458,6 +459,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public SnapshotReader withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+ wrapped.withRowRangeIndex(rowRangeIndex);
+ return this;
+ }
+
@Override
public SnapshotReader withReadType(RowType readType) {
wrapped.withReadType(readType);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/globalindex/DataEvolutionBatchScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/globalindex/DataEvolutionBatchScanTest.java
new file mode 100644
index 0000000000..1bd15f7197
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/globalindex/DataEvolutionBatchScanTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DataEvolutionBatchScan}. */
+public class DataEvolutionBatchScanTest {
+
+ @Test
+ public void testWrapToIndexSplitsRandomly() {
+ Random random = new Random();
+ for (int round = 0; round < 2000; round++) {
+ int splitNum = 1 + random.nextInt(20);
+ List<Split> splits = new ArrayList<>(splitNum);
+ List<Range> splitRanges = new ArrayList<>(splitNum);
+
+ long cursor = random.nextInt(10);
+ for (int i = 0; i < splitNum; i++) {
+ long start = cursor + random.nextInt(4);
+ long rowCount = 30 + random.nextInt(31);
+ long end = start + rowCount - 1;
+
+ DataSplit split =
+ DataSplit.builder()
+ .withSnapshot(1L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(i)
+ .withBucketPath("bucket-" + i)
+ .withDataFiles(
+ Collections.singletonList(
+ newAppendFile(
+ start,
+ rowCount,
+ "round-" + round +
"-split-" + i)))
+ .build();
+ splits.add(split);
+ splitRanges.add(new Range(start, end));
+ cursor = end + 2 + random.nextInt(4);
+ }
+
+ List<Range> candidateRanges = new ArrayList<>();
+ for (Range splitRange : splitRanges) {
+ int fragmentNum = 5 + random.nextInt(6);
+ candidateRanges.add(new Range(splitRange.from,
splitRange.from));
+ for (int i = 0; i < fragmentNum - 2; i++) {
+ long rowId = splitRange.from + 2L * (i + 1);
+ candidateRanges.add(new Range(rowId, rowId));
+ }
+ candidateRanges.add(new Range(splitRange.to, splitRange.to));
+ }
+
+ List<Range> rowRanges = Range.sortAndMergeOverlap(candidateRanges,
true);
+ List<Split> indexedSplits =
+ DataEvolutionBatchScan.wrapToIndexSplits(
+ splits, RowRangeIndex.create(rowRanges),
null)
+ .splits();
+
+ assertThat(indexedSplits).hasSize(splits.size());
+ for (int i = 0; i < indexedSplits.size(); i++) {
+ DataSplit split = (DataSplit) splits.get(i);
+ IndexedSplit indexedSplit = (IndexedSplit)
indexedSplits.get(i);
+
+ List<DataFileMeta> files = split.dataFiles();
+ long min = files.get(0).nonNullFirstRowId();
+ long max =
+ files.get(files.size() - 1).nonNullFirstRowId()
+ + files.get(files.size() - 1).rowCount()
+ - 1;
+ List<Range> expected = expectedRanges(min, max, rowRanges);
+
+ assertThat(expected).isNotEmpty();
+ assertThat(expected.size()).isBetween(5, 10);
+ assertThat(indexedSplit.dataSplit()).isEqualTo(split);
+
assertThat(indexedSplit.rowRanges()).containsExactlyElementsOf(expected);
+ }
+ }
+ }
+
+ private static List<Range> expectedRanges(long min, long max, List<Range>
rowRanges) {
+ List<Range> expected = new ArrayList<>();
+ for (Range range : rowRanges) {
+ if (range.to < min) {
+ continue;
+ }
+ if (range.from > max) {
+ break;
+ }
+ expected.add(new Range(Math.max(min, range.from), Math.min(max,
range.to)));
+ }
+ return expected;
+ }
+
+ private static DataFileMeta newAppendFile(long firstRowId, long rowCount,
String name) {
+ return DataFileMeta.forAppend(
+ name,
+ 1024L,
+ rowCount,
+ EMPTY_STATS,
+ 0L,
+ firstRowId + rowCount - 1,
+ 1L,
+ Collections.emptyList(),
+ null,
+ null,
+ null,
+ null,
+ firstRowId,
+ null);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionFileStoreScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionFileStoreScanTest.java
index c8fc9cf46d..d23f749931 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionFileStoreScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionFileStoreScanTest.java
@@ -36,6 +36,8 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -258,6 +260,20 @@ public class DataEvolutionFileStoreScanTest {
assertThat(nullCounts.isNullAt(2)).isTrue();
}
+ @Test
+ public void testIntersectsRowRanges() {
+ List<Range> rowRanges =
+ Arrays.asList(
+ new Range(20, 30), new Range(0, 10), new Range(5, 15),
new Range(35, 40));
+ RowRangeIndex index = RowRangeIndex.create(rowRanges);
+
+ assertThat(index.intersects(14, 14)).isTrue();
+ assertThat(index.intersects(16, 19)).isFalse();
+ assertThat(index.intersects(31, 34)).isFalse();
+ assertThat(index.intersects(29, 31)).isTrue();
+ assertThat(index.intersects(100, 200)).isFalse();
+ }
+
private Schema createSchema(String... fieldNames) {
Schema.Builder builder = Schema.newBuilder();
for (int i = 0; i < fieldNames.length; i++) {