This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 956551c641 [core] Data evolution table limit push down (#7298)
956551c641 is described below

commit 956551c641ebff8a6bc04e2a2a141a147e289c51
Author: YeJunHao <[email protected]>
AuthorDate: Thu Feb 26 13:48:12 2026 +0800

    [core] Data evolution table limit push down (#7298)
---
 .../operation/DataEvolutionFileStoreScan.java      | 48 ++++++++++++
 .../paimon/table/DataEvolutionTableTest.java       | 88 ++++++++++++++++++++++
 2 files changed, 136 insertions(+)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index eb6a8360f2..396020debb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.DataEvolutionArray;
 import org.apache.paimon.reader.DataEvolutionRow;
@@ -41,10 +42,14 @@ import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.Function;
@@ -123,6 +128,49 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
         return this;
     }
 
+    @Override
+    public Iterator<ManifestEntry> readManifestEntries(
+            List<ManifestFileMeta> manifestFileMetas, boolean useSequential) {
+        if (inputFilter == null
+                && limit != null
+                && limit > 0
+                && manifestFileMetas.stream()
+                        .allMatch(meta -> meta.minRowId() != null && 
meta.maxRowId() != null)) {
+            List<ManifestEntry> filtered = new ArrayList<>();
+            RangeHelper<ManifestFileMeta> rangeHelper =
+                    new RangeHelper<>(meta -> new Range(meta.minRowId(), 
meta.maxRowId()));
+            Queue<List<ManifestFileMeta>> queue =
+                    new 
ArrayDeque<>(rangeHelper.mergeOverlappingRanges(manifestFileMetas));
+
+            long accumulatedRowCount = 0;
+            while (!queue.isEmpty()) {
+                List<ManifestFileMeta> groupMetas = queue.poll();
+                List<ManifestEntry> entries = new ArrayList<>();
+                super.readManifestEntries(groupMetas, 
useSequential).forEachRemaining(entries::add);
+                RangeHelper<ManifestEntry> rangeHelper2 =
+                        new RangeHelper<>(e -> e.file().nonNullRowIdRange());
+                List<List<ManifestEntry>> splitByRowId =
+                        rangeHelper2.mergeOverlappingRanges(entries);
+
+                for (List<ManifestEntry> group : splitByRowId) {
+                    filtered.addAll(group);
+                    long groupRowCount =
+                            group.stream()
+                                    .mapToLong(e -> e.file().rowCount())
+                                    .reduce(Long::max)
+                                    .orElse(0L);
+                    accumulatedRowCount += groupRowCount;
+                    if (accumulatedRowCount >= limit) {
+                        return filtered.iterator();
+                    }
+                }
+            }
+            return filtered.iterator();
+        }
+
+        return super.readManifestEntries(manifestFileMetas, useSequential);
+    }
+
     @Override
     protected boolean postFilterManifestEntriesEnabled() {
         return inputFilter != null;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 5f49e54aeb..5ca56eed31 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -457,6 +457,94 @@ public class DataEvolutionTableTest extends 
DataEvolutionTestBase {
         assertThat(readBuilder.newScan().plan().splits().isEmpty()).isTrue();
     }
 
+    @Test
+    public void testLimitPushDownWithoutFilter() throws Exception {
+        createTableDefault();
+        Schema schema = schemaDefault();
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+        RowType writeType = schema.rowType().project(Arrays.asList("f0", 
"f1"));
+
+        // Write three commits through normal path and verify limit pushdown 
can reduce scanned
+        // files.
+        for (int i = 0; i < 30; i++) {
+            try (BatchTableWrite write = 
builder.newWrite().withWriteType(writeType)) {
+                write.write(GenericRow.of(i * 2 + 1, 
BinaryString.fromString("v" + (i * 2 + 1))));
+                write.write(GenericRow.of(i * 2 + 2, 
BinaryString.fromString("v" + (i * 2 + 2))));
+
+                BatchTableCommit commit = builder.newCommit();
+                commit.commit(write.prepareCommit());
+            }
+        }
+
+        for (int i = 0; i < 2; i++) {
+            try (BatchTableWrite write =
+                    builder.newWrite().withWriteType(writeType.project("f1"))) 
{
+                write.write(GenericRow.of(BinaryString.fromString("v" + (i * 2 
+ 1))));
+                write.write(GenericRow.of(BinaryString.fromString("v" + (i * 2 
+ 2))));
+
+                BatchTableCommit commit = builder.newCommit();
+                List<CommitMessage> commitMessages = write.prepareCommit();
+                setFirstRowId(commitMessages, 0L);
+                commit.commit(commitMessages);
+            }
+        }
+
+        ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+        TableScan.Plan fullPlan = readBuilder.newScan().plan();
+        TableScan.Plan limitPlan = readBuilder.withLimit(3).newScan().plan();
+
+        int fullFiles =
+                fullPlan.splits().stream()
+                        .map(split -> (DataSplit) split)
+                        .mapToInt(split -> split.dataFiles().size())
+                        .sum();
+        int limitedFiles =
+                limitPlan.splits().stream()
+                        .map(split -> (DataSplit) split)
+                        .mapToInt(split -> split.dataFiles().size())
+                        .sum();
+
+        assertThat(fullFiles).isEqualTo(32);
+        assertThat(limitedFiles).isEqualTo(4);
+        assertThat(limitedFiles).isLessThan(fullFiles);
+    }
+
+    @Test
+    public void testLimitPushDownWithFilterShouldNotEarlyStop() throws 
Exception {
+        createTableDefault();
+        Schema schema = schemaDefault();
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+        RowType writeType = schema.rowType().project(Arrays.asList("f0", 
"f1"));
+
+        // Three manifests with different values on f1, only the last one 
matches filter.
+        for (int i = 0; i < 3; i++) {
+            String value = i == 0 ? "a" : (i == 1 ? "b" : "c");
+            try (BatchTableWrite write = 
builder.newWrite().withWriteType(writeType)) {
+                write.write(GenericRow.of(i * 2 + 1, 
BinaryString.fromString(value)));
+                write.write(GenericRow.of(i * 2 + 2, 
BinaryString.fromString(value)));
+
+                BatchTableCommit commit = builder.newCommit();
+                commit.commit(write.prepareCommit());
+            }
+        }
+
+        ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(schema.rowType());
+        Predicate predicate = predicateBuilder.equal(1, 
BinaryString.fromString("c"));
+
+        TableScan.Plan plan = 
readBuilder.withFilter(predicate).withLimit(1).newScan().plan();
+        assertThat(plan.splits().isEmpty()).isFalse();
+
+        RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(plan);
+        AtomicInteger rowCount = new AtomicInteger(0);
+        reader.forEachRemaining(
+                row -> {
+                    assertThat(row.getString(1).toString()).isEqualTo("c");
+                    rowCount.incrementAndGet();
+                });
+        assertThat(rowCount.get()).isGreaterThan(0);
+    }
+
     @Test
     public void testWithRowIdsFilterManifestEntries() throws Exception {
         innerTestWithRowIds(true);

Reply via email to