This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e751b6c8c1 [core] Optimize data-evolution scan with index (#7305)
e751b6c8c1 is described below

commit e751b6c8c1eb945704cbb163ef987e6dfb3f8d10
Author: YeJunHao <[email protected]>
AuthorDate: Thu Feb 26 22:09:10 2026 +0800

    [core] Optimize data-evolution scan with index (#7305)
---
 .../org/apache/paimon/utils/RowRangeIndex.java     | 105 +++++++++++++++
 .../paimon/globalindex/DataEvolutionBatchScan.java |  98 +++++++++-----
 .../paimon/operation/AbstractFileStoreScan.java    |  24 +++-
 .../operation/DataEvolutionFileStoreScan.java      |  13 +-
 .../org/apache/paimon/operation/FileStoreScan.java |   3 +
 .../apache/paimon/operation/ManifestsReader.java   |  19 +--
 .../paimon/table/format/FormatReadBuilder.java     |   6 +
 .../paimon/table/source/AbstractDataTableScan.java |  11 +-
 .../apache/paimon/table/source/InnerTableScan.java |   5 +
 .../apache/paimon/table/source/ReadBuilder.java    |   9 ++
 .../paimon/table/source/ReadBuilderImpl.java       |  17 ++-
 .../table/source/snapshot/SnapshotReader.java      |   3 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |  11 +-
 .../apache/paimon/table/system/AuditLogTable.java  |   7 +
 .../globalindex/DataEvolutionBatchScanTest.java    | 142 +++++++++++++++++++++
 .../operation/DataEvolutionFileStoreScanTest.java  |  16 +++
 16 files changed, 422 insertions(+), 67 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java
new file mode 100644
index 0000000000..a068d04f72
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Index for row ranges. */
+public class RowRangeIndex {
+
+    private final List<Range> ranges;
+    private final long[] starts;
+    private final long[] ends;
+
+    private RowRangeIndex(List<Range> ranges) {
+        this.ranges = ranges;
+        this.starts = new long[ranges.size()];
+        this.ends = new long[ranges.size()];
+        for (int i = 0; i < ranges.size(); i++) {
+            Range range = ranges.get(i);
+            starts[i] = range.from;
+            ends[i] = range.to;
+        }
+    }
+
+    public static RowRangeIndex create(List<Range> ranges) {
+        checkArgument(ranges != null, "Ranges cannot be null");
+        return new RowRangeIndex(Range.sortAndMergeOverlap(ranges, true));
+    }
+
+    public List<Range> ranges() {
+        return Collections.unmodifiableList(ranges);
+    }
+
+    public boolean intersects(long start, long end) {
+        int candidate = lowerBound(ends, start);
+        return candidate < starts.length && starts[candidate] <= end;
+    }
+
+    public List<Range> intersectedRanges(long start, long end) {
+        int left = lowerBound(ends, start);
+        if (left >= ranges.size()) {
+            return Collections.emptyList();
+        }
+
+        int right = lowerBound(ends, end);
+        if (right >= ranges.size()) {
+            right = ranges.size() - 1;
+        }
+
+        if (starts[left] > end) {
+            return Collections.emptyList();
+        }
+
+        List<Range> expected = new ArrayList<>();
+        Range from = ranges.get(left);
+        expected.add(new Range(Math.max(start, from.from), Math.min(end, 
from.to)));
+
+        int length = right - left - 1;
+        if (length > 0) {
+            expected.addAll(ranges.subList(left + 1, left + 1 + length));
+        }
+
+        if (right != left) {
+            Range to = ranges.get(right);
+            if (to.from <= end) {
+                expected.add(new Range(Math.max(start, to.from), Math.min(end, 
to.to)));
+            }
+        }
+        return expected;
+    }
+
+    private static int lowerBound(long[] sorted, long target) {
+        int left = 0;
+        int right = sorted.length;
+        while (left < right) {
+            int mid = left + (right - left) / 2;
+            if (sorted[mid] < target) {
+                left = mid + 1;
+            } else {
+                right = mid;
+            }
+        }
+        return left;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
index 157502de6c..708d7d874c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.globalindex;
 
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.PartitionEntry;
@@ -40,17 +41,21 @@ import 
org.apache.paimon.table.source.snapshot.TimeTravelUtil;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.function.Function;
 
 import static 
org.apache.paimon.globalindex.GlobalIndexScanBuilder.parallelScan;
 import static org.apache.paimon.table.SpecialFields.ROW_ID;
+import static 
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
 
 /** Scan for data evolution table. */
 public class DataEvolutionBatchScan implements DataTableScan {
@@ -60,7 +65,7 @@ public class DataEvolutionBatchScan implements DataTableScan {
 
     private Predicate filter;
     private VectorSearch vectorSearch;
-    private List<Range> pushedRowRanges;
+    private RowRangeIndex pushedRowRangeIndex;
     private GlobalIndexResult globalIndexResult;
 
     public DataEvolutionBatchScan(FileStoreTable wrapped, DataTableBatchScan 
batchScan) {
@@ -197,7 +202,20 @@ public class DataEvolutionBatchScan implements 
DataTableScan {
             return this;
         }
 
-        this.pushedRowRanges = rowRanges;
+        this.pushedRowRangeIndex = RowRangeIndex.create(rowRanges);
+        if (globalIndexResult != null) {
+            throw new IllegalStateException("Cannot push row ranges after 
global index eval.");
+        }
+        return this;
+    }
+
+    @Override
+    public InnerTableScan withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+        if (rowRangeIndex == null) {
+            return this;
+        }
+
+        this.pushedRowRangeIndex = rowRangeIndex;
         if (globalIndexResult != null) {
             throw new IllegalStateException("Cannot push row ranges after 
global index eval.");
         }
@@ -207,8 +225,9 @@ public class DataEvolutionBatchScan implements 
DataTableScan {
     // To enable other system computing index result by their own.
     public InnerTableScan withGlobalIndexResult(GlobalIndexResult 
globalIndexResult) {
         this.globalIndexResult = globalIndexResult;
-        if (pushedRowRanges != null) {
-            throw new IllegalStateException("");
+        if (pushedRowRangeIndex != null) {
+            throw new IllegalStateException(
+                    "Can't set global index result after pushing down row 
ranges.");
         }
         return this;
     }
@@ -220,26 +239,26 @@ public class DataEvolutionBatchScan implements 
DataTableScan {
 
     @Override
     public Plan plan() {
-        List<Range> rowRanges = this.pushedRowRanges;
+        RowRangeIndex rowRangeIndex = this.pushedRowRangeIndex;
         ScoreGetter scoreGetter = null;
 
-        if (rowRanges == null) {
+        if (rowRangeIndex == null) {
             Optional<GlobalIndexResult> indexResult = evalGlobalIndex();
             if (indexResult.isPresent()) {
                 GlobalIndexResult result = indexResult.get();
-                rowRanges = result.results().toRangeList();
+                rowRangeIndex = 
RowRangeIndex.create(result.results().toRangeList());
                 if (result instanceof ScoredGlobalIndexResult) {
                     scoreGetter = ((ScoredGlobalIndexResult) 
result).scoreGetter();
                 }
             }
         }
 
-        if (rowRanges == null) {
+        if (rowRangeIndex == null) {
             return batchScan.plan();
         }
 
-        List<Split> splits = 
batchScan.withRowRanges(rowRanges).plan().splits();
-        return wrapToIndexSplits(splits, rowRanges, scoreGetter);
+        List<Split> splits = 
batchScan.withRowRangeIndex(rowRangeIndex).plan().splits();
+        return wrapToIndexSplits(splits, rowRangeIndex, scoreGetter);
     }
 
     private Optional<GlobalIndexResult> evalGlobalIndex() {
@@ -285,35 +304,48 @@ public class DataEvolutionBatchScan implements 
DataTableScan {
         return Optional.of(result);
     }
 
-    private static Plan wrapToIndexSplits(
-            List<Split> splits, List<Range> rowRanges, ScoreGetter 
scoreGetter) {
+    @VisibleForTesting
+    static Plan wrapToIndexSplits(
+            List<Split> splits, RowRangeIndex rowRangeIndex, ScoreGetter 
scoreGetter) {
         List<Split> indexedSplits = new ArrayList<>();
-        for (Split split : splits) {
-            DataSplit dataSplit = (DataSplit) split;
-            List<Range> fileRanges = new ArrayList<>();
-            for (DataFileMeta file : dataSplit.dataFiles()) {
-                fileRanges.add(file.nonNullRowIdRange());
-            }
-
-            fileRanges = Range.mergeSortedAsPossible(fileRanges);
+        Function<Split, List<IndexedSplit>> process =
+                split ->
+                        Collections.singletonList(
+                                wrap((DataSplit) split, rowRangeIndex, 
scoreGetter));
+        randomlyExecuteSequentialReturn(process, splits, 
null).forEachRemaining(indexedSplits::add);
+        return () -> indexedSplits;
+    }
 
-            List<Range> expected = Range.and(fileRanges, rowRanges);
+    private static IndexedSplit wrap(
+            DataSplit dataSplit, final RowRangeIndex rowRangeIndex, 
ScoreGetter scoreGetter) {
+        List<DataFileMeta> files = dataSplit.dataFiles();
+        long min = files.get(0).nonNullFirstRowId();
+        long max =
+                files.get(files.size() - 1).nonNullFirstRowId()
+                        + files.get(files.size() - 1).rowCount()
+                        - 1;
+
+        List<Range> expected = rowRangeIndex.intersectedRanges(min, max);
+        if (expected.isEmpty()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "This is a bug, there should be intersected ranges 
for split with min row id %d and max row id %d.",
+                            min, max));
+        }
 
-            float[] scores = null;
-            if (scoreGetter != null) {
-                int size = expected.stream().mapToInt(r -> (int) 
(r.count())).sum();
-                scores = new float[size];
+        float[] scores = null;
+        if (scoreGetter != null) {
+            int size = expected.stream().mapToInt(r -> (int) 
(r.count())).sum();
+            scores = new float[size];
 
-                int index = 0;
-                for (Range range : expected) {
-                    for (long i = range.from; i <= range.to; i++) {
-                        scores[index++] = scoreGetter.score(i);
-                    }
+            int index = 0;
+            for (Range range : expected) {
+                for (long i = range.from; i <= range.to; i++) {
+                    scores[index++] = scoreGetter.score(i);
                 }
             }
-
-            indexedSplits.add(new IndexedSplit(dataSplit, expected, scores));
         }
-        return () -> indexedSplits;
+
+        return new IndexedSplit(dataSplit, expected, scores);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 033b24c746..68ebacaa80 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -44,6 +44,7 @@ import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.ListUtils;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
@@ -95,7 +96,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     private ScanMetrics scanMetrics = null;
     private boolean dropStats;
-    @Nullable protected List<Range> rowRanges;
+    @Nullable protected RowRangeIndex rowRangeIndex = null;
     @Nullable protected Long limit;
 
     public AbstractFileStoreScan(
@@ -247,8 +248,25 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     @Override
     public FileStoreScan withRowRanges(List<Range> rowRanges) {
-        this.rowRanges = rowRanges;
-        manifestsReader.withRowRanges(rowRanges);
+        if (rowRanges == null) {
+            this.rowRangeIndex = null;
+            manifestsReader.withRowRangeIndex(null);
+            return this;
+        }
+        this.rowRangeIndex = RowRangeIndex.create(rowRanges);
+        manifestsReader.withRowRangeIndex(this.rowRangeIndex);
+        return this;
+    }
+
+    @Override
+    public FileStoreScan withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+        if (rowRangeIndex == null) {
+            this.rowRangeIndex = null;
+            manifestsReader.withRowRangeIndex(null);
+            return this;
+        }
+        this.rowRangeIndex = rowRangeIndex;
+        manifestsReader.withRowRangeIndex(rowRangeIndex);
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 5cc31ed5b0..7ac829be01 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -315,7 +315,7 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
         }
 
         // If rowRanges is null, all entries should be kept
-        if (this.rowRanges == null) {
+        if (this.rowRangeIndex == null) {
             return true;
         }
 
@@ -328,16 +328,7 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
         // Check if any value in indices is in the range [firstRowId, 
firstRowId + rowCount - 1]
         long rowCount = file.rowCount();
         long endRowId = firstRowId + rowCount - 1;
-        Range fileRowRange = new Range(firstRowId, endRowId);
-
-        for (Range expected : rowRanges) {
-            if (Range.intersection(fileRowRange, expected) != null) {
-                return true;
-            }
-        }
-
-        // No matching indices found, skip this entry
-        return false;
+        return rowRangeIndex.intersects(firstRowId, endRowId);
     }
 
     /** Statistics for data evolution. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 7542ff87d4..1e044f810f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -34,6 +34,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BiFilter;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 
 import javax.annotation.Nullable;
 
@@ -91,6 +92,8 @@ public interface FileStoreScan {
 
     FileStoreScan withRowRanges(List<Range> rowRanges);
 
+    FileStoreScan withRowRangeIndex(RowRangeIndex rowRangeIndex);
+
     FileStoreScan withReadType(RowType readType);
 
     FileStoreScan withLimit(long limit);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
index 144121ca6e..e5727cc0c8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -28,7 +28,7 @@ import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BiFilter;
-import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
@@ -55,7 +55,7 @@ public class ManifestsReader {
     @Nullable private Integer specifiedLevel = null;
     @Nullable private PartitionPredicate partitionFilter = null;
     @Nullable private BiFilter<Integer, Integer> levelMinMaxFilter = null;
-    @Nullable protected List<Range> rowRanges;
+    @Nullable protected RowRangeIndex rowRangeIndex = null;
 
     public ManifestsReader(
             RowType partitionType,
@@ -108,8 +108,8 @@ public class ManifestsReader {
         return this;
     }
 
-    public ManifestsReader withRowRanges(List<Range> rowRanges) {
-        this.rowRanges = rowRanges;
+    public ManifestsReader withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+        this.rowRangeIndex = rowRangeIndex;
         return this;
     }
 
@@ -150,7 +150,7 @@ public class ManifestsReader {
     }
 
     private boolean filterManifestByRowRanges(ManifestFileMeta manifest) {
-        if (rowRanges == null) {
+        if (rowRangeIndex == null) {
             return true;
         }
         Long min = manifest.minRowId();
@@ -159,14 +159,7 @@ public class ManifestsReader {
             return true;
         }
 
-        Range manifestRowRange = new Range(min, max);
-
-        for (Range expected : rowRanges) {
-            if (Range.intersection(manifestRowRange, expected) != null) {
-                return true;
-            }
-        }
-        return false;
+        return this.rowRangeIndex.intersects(min, max);
     }
 
     /** Note: Keep this thread-safe. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index a09ad8f36d..976c95f1f1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -43,6 +43,7 @@ import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 
 import javax.annotation.Nullable;
 
@@ -242,6 +243,11 @@ public class FormatReadBuilder implements ReadBuilder {
         throw new UnsupportedOperationException("Format Table does not support 
withRowRanges.");
     }
 
+    @Override
+    public ReadBuilder withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+        throw new UnsupportedOperationException("Format Table does not support 
withRowRangeIndex.");
+    }
+
     @Override
     public ReadBuilder withVectorSearch(VectorSearch vectorSearch) {
         throw new UnsupportedOperationException("Format Table does not support 
withRowRanges.");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index dbc88937a4..4908a85b32 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -57,6 +57,7 @@ import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
@@ -191,8 +192,14 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
     }
 
     @Override
-    public InnerTableScan withRowRanges(List<Range> rowRanges) {
-        snapshotReader.withRowRanges(rowRanges);
+    public InnerTableScan withRowRanges(List<Range> sortedPushdownRowRanges) {
+        snapshotReader.withRowRanges(sortedPushdownRowRanges);
+        return this;
+    }
+
+    @Override
+    public InnerTableScan withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+        snapshotReader.withRowRangeIndex(rowRangeIndex);
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index bc3d0f275f..1f6edd7176 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -27,6 +27,7 @@ import org.apache.paimon.predicate.VectorSearch;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 
 import javax.annotation.Nullable;
 
@@ -74,6 +75,10 @@ public interface InnerTableScan extends TableScan {
         return this;
     }
 
+    default InnerTableScan withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+        return this;
+    }
+
     default InnerTableScan withBucket(int bucket) {
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index a4889a47a9..8c3cf6f681 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -28,6 +28,7 @@ import org.apache.paimon.predicate.VectorSearch;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 
 import java.io.Serializable;
 import java.util.List;
@@ -159,6 +160,14 @@ public interface ReadBuilder extends Serializable {
      */
     ReadBuilder withRowRanges(List<Range> rowRanges);
 
+    /**
+     * Specify the row range index to be read. This is usually used to read 
specific rows in
+     * data-evolution table.
+     *
+     * @param rowRangeIndex the indexed row id ranges to be read
+     */
+    ReadBuilder withRowRangeIndex(RowRangeIndex rowRangeIndex);
+
     /**
      * Push vector search to the reader.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index f4f529dc4c..f31c750ec1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -28,6 +28,7 @@ import org.apache.paimon.table.InnerTable;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 
 import javax.annotation.Nullable;
 
@@ -62,7 +63,7 @@ public class ReadBuilderImpl implements ReadBuilder {
     private Filter<Integer> bucketFilter;
 
     private @Nullable RowType readType;
-    private @Nullable List<Range> rowRanges;
+    private @Nullable RowRangeIndex rowRangeIndex;
     private @Nullable VectorSearch vectorSearch;
 
     private boolean dropStats = false;
@@ -146,7 +147,17 @@ public class ReadBuilderImpl implements ReadBuilder {
 
     @Override
     public ReadBuilder withRowRanges(List<Range> indices) {
-        this.rowRanges = indices;
+        if (indices == null) {
+            this.rowRangeIndex = null;
+            return this;
+        }
+        this.rowRangeIndex = RowRangeIndex.create(indices);
+        return this;
+    }
+
+    @Override
+    public ReadBuilder withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+        this.rowRangeIndex = rowRangeIndex;
         return this;
     }
 
@@ -198,7 +209,7 @@ public class ReadBuilderImpl implements ReadBuilder {
         scan.withFilter(filter)
                 .withReadType(readType)
                 .withPartitionFilter(partitionFilter)
-                .withRowRanges(rowRanges)
+                .withRowRangeIndex(rowRangeIndex)
                 .withVectorSearch(vectorSearch);
 
         checkState(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 4352b40037..f837daaaec 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -40,6 +40,7 @@ import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
@@ -114,6 +115,8 @@ public interface SnapshotReader {
 
     SnapshotReader withRowRanges(List<Range> rowRanges);
 
+    SnapshotReader withRowRangeIndex(RowRangeIndex rowRangeIndex);
+
     SnapshotReader withReadType(RowType readType);
 
     SnapshotReader withLimit(int limit);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index c839a5dbd1..a6710ff008 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -60,6 +60,7 @@ import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.LazyField;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
@@ -307,8 +308,14 @@ public class SnapshotReaderImpl implements SnapshotReader {
     }
 
     @Override
-    public SnapshotReader withRowRanges(List<Range> rowRanges) {
-        scan.withRowRanges(rowRanges);
+    public SnapshotReader withRowRanges(List<Range> sortedPushdownRowRanges) {
+        scan.withRowRanges(sortedPushdownRowRanges);
+        return this;
+    }
+
+    @Override
+    public SnapshotReader withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+        scan.withRowRangeIndex(rowRangeIndex);
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index d613d9455f..980183d801 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -67,6 +67,7 @@ import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.ProjectedRow;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 import org.apache.paimon.utils.SimpleFileReader;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
@@ -458,6 +459,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader withRowRangeIndex(RowRangeIndex rowRangeIndex) {
+            wrapped.withRowRangeIndex(rowRangeIndex);
+            return this;
+        }
+
         @Override
         public SnapshotReader withReadType(RowType readType) {
             wrapped.withReadType(readType);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/globalindex/DataEvolutionBatchScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/DataEvolutionBatchScanTest.java
new file mode 100644
index 0000000000..1bd15f7197
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/DataEvolutionBatchScanTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DataEvolutionBatchScan}. */
+public class DataEvolutionBatchScanTest {
+
+    @Test
+    public void testWrapToIndexSplitsRandomly() {
+        Random random = new Random();
+        for (int round = 0; round < 2000; round++) {
+            int splitNum = 1 + random.nextInt(20);
+            List<Split> splits = new ArrayList<>(splitNum);
+            List<Range> splitRanges = new ArrayList<>(splitNum);
+
+            long cursor = random.nextInt(10);
+            for (int i = 0; i < splitNum; i++) {
+                long start = cursor + random.nextInt(4);
+                long rowCount = 30 + random.nextInt(31);
+                long end = start + rowCount - 1;
+
+                DataSplit split =
+                        DataSplit.builder()
+                                .withSnapshot(1L)
+                                .withPartition(BinaryRow.EMPTY_ROW)
+                                .withBucket(i)
+                                .withBucketPath("bucket-" + i)
+                                .withDataFiles(
+                                        Collections.singletonList(
+                                                newAppendFile(
+                                                        start,
+                                                        rowCount,
+                                                        "round-" + round + 
"-split-" + i)))
+                                .build();
+                splits.add(split);
+                splitRanges.add(new Range(start, end));
+                cursor = end + 2 + random.nextInt(4);
+            }
+
+            List<Range> candidateRanges = new ArrayList<>();
+            for (Range splitRange : splitRanges) {
+                int fragmentNum = 5 + random.nextInt(6);
+                candidateRanges.add(new Range(splitRange.from, 
splitRange.from));
+                for (int i = 0; i < fragmentNum - 2; i++) {
+                    long rowId = splitRange.from + 2L * (i + 1);
+                    candidateRanges.add(new Range(rowId, rowId));
+                }
+                candidateRanges.add(new Range(splitRange.to, splitRange.to));
+            }
+
+            List<Range> rowRanges = Range.sortAndMergeOverlap(candidateRanges, 
true);
+            List<Split> indexedSplits =
+                    DataEvolutionBatchScan.wrapToIndexSplits(
+                                    splits, RowRangeIndex.create(rowRanges), 
null)
+                            .splits();
+
+            assertThat(indexedSplits).hasSize(splits.size());
+            for (int i = 0; i < indexedSplits.size(); i++) {
+                DataSplit split = (DataSplit) splits.get(i);
+                IndexedSplit indexedSplit = (IndexedSplit) 
indexedSplits.get(i);
+
+                List<DataFileMeta> files = split.dataFiles();
+                long min = files.get(0).nonNullFirstRowId();
+                long max =
+                        files.get(files.size() - 1).nonNullFirstRowId()
+                                + files.get(files.size() - 1).rowCount()
+                                - 1;
+                List<Range> expected = expectedRanges(min, max, rowRanges);
+
+                assertThat(expected).isNotEmpty();
+                assertThat(expected.size()).isBetween(5, 10);
+                assertThat(indexedSplit.dataSplit()).isEqualTo(split);
+                
assertThat(indexedSplit.rowRanges()).containsExactlyElementsOf(expected);
+            }
+        }
+    }
+
+    private static List<Range> expectedRanges(long min, long max, List<Range> 
rowRanges) {
+        List<Range> expected = new ArrayList<>();
+        for (Range range : rowRanges) {
+            if (range.to < min) {
+                continue;
+            }
+            if (range.from > max) {
+                break;
+            }
+            expected.add(new Range(Math.max(min, range.from), Math.min(max, 
range.to)));
+        }
+        return expected;
+    }
+
+    private static DataFileMeta newAppendFile(long firstRowId, long rowCount, 
String name) {
+        return DataFileMeta.forAppend(
+                name,
+                1024L,
+                rowCount,
+                EMPTY_STATS,
+                0L,
+                firstRowId + rowCount - 1,
+                1L,
+                Collections.emptyList(),
+                null,
+                null,
+                null,
+                null,
+                firstRowId,
+                null);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionFileStoreScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionFileStoreScanTest.java
index c8fc9cf46d..d23f749931 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionFileStoreScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionFileStoreScanTest.java
@@ -36,6 +36,8 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -258,6 +260,20 @@ public class DataEvolutionFileStoreScanTest {
         assertThat(nullCounts.isNullAt(2)).isTrue();
     }
 
+    @Test
+    public void testIntersectsRowRanges() {
+        List<Range> rowRanges =
+                Arrays.asList(
+                        new Range(20, 30), new Range(0, 10), new Range(5, 15), 
new Range(35, 40));
+        RowRangeIndex index = RowRangeIndex.create(rowRanges);
+
+        assertThat(index.intersects(14, 14)).isTrue();
+        assertThat(index.intersects(16, 19)).isFalse();
+        assertThat(index.intersects(31, 34)).isFalse();
+        assertThat(index.intersects(29, 31)).isTrue();
+        assertThat(index.intersects(100, 200)).isFalse();
+    }
+
     private Schema createSchema(String... fieldNames) {
         Schema.Builder builder = Schema.newBuilder();
         for (int i = 0; i < fieldNames.length; i++) {


Reply via email to