This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 956551c641 [core] Data evolution table limit push down (#7298)
956551c641 is described below
commit 956551c641ebff8a6bc04e2a2a141a147e289c51
Author: YeJunHao <[email protected]>
AuthorDate: Thu Feb 26 13:48:12 2026 +0800
[core] Data evolution table limit push down (#7298)
---
.../operation/DataEvolutionFileStoreScan.java | 48 ++++++++++++
.../paimon/table/DataEvolutionTableTest.java | 88 ++++++++++++++++++++++
2 files changed, 136 insertions(+)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index eb6a8360f2..396020debb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.DataEvolutionArray;
import org.apache.paimon.reader.DataEvolutionRow;
@@ -41,10 +42,14 @@ import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
@@ -123,6 +128,49 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
return this;
}
+ @Override
+ public Iterator<ManifestEntry> readManifestEntries(
+ List<ManifestFileMeta> manifestFileMetas, boolean useSequential) {
+ if (inputFilter == null
+ && limit != null
+ && limit > 0
+ && manifestFileMetas.stream()
+ .allMatch(meta -> meta.minRowId() != null &&
meta.maxRowId() != null)) {
+ List<ManifestEntry> filtered = new ArrayList<>();
+ RangeHelper<ManifestFileMeta> rangeHelper =
+ new RangeHelper<>(meta -> new Range(meta.minRowId(),
meta.maxRowId()));
+ Queue<List<ManifestFileMeta>> queue =
+ new
ArrayDeque<>(rangeHelper.mergeOverlappingRanges(manifestFileMetas));
+
+ long accumulatedRowCount = 0;
+ while (!queue.isEmpty()) {
+ List<ManifestFileMeta> groupMetas = queue.poll();
+ List<ManifestEntry> entries = new ArrayList<>();
+ super.readManifestEntries(groupMetas,
useSequential).forEachRemaining(entries::add);
+ RangeHelper<ManifestEntry> rangeHelper2 =
+ new RangeHelper<>(e -> e.file().nonNullRowIdRange());
+ List<List<ManifestEntry>> splitByRowId =
+ rangeHelper2.mergeOverlappingRanges(entries);
+
+ for (List<ManifestEntry> group : splitByRowId) {
+ filtered.addAll(group);
+ long groupRowCount =
+ group.stream()
+ .mapToLong(e -> e.file().rowCount())
+ .reduce(Long::max)
+ .orElse(0L);
+ accumulatedRowCount += groupRowCount;
+ if (accumulatedRowCount >= limit) {
+ return filtered.iterator();
+ }
+ }
+ }
+ return filtered.iterator();
+ }
+
+ return super.readManifestEntries(manifestFileMetas, useSequential);
+ }
+
@Override
protected boolean postFilterManifestEntriesEnabled() {
return inputFilter != null;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 5f49e54aeb..5ca56eed31 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -457,6 +457,94 @@ public class DataEvolutionTableTest extends
DataEvolutionTestBase {
assertThat(readBuilder.newScan().plan().splits().isEmpty()).isTrue();
}
+ @Test
+ public void testLimitPushDownWithoutFilter() throws Exception {
+ createTableDefault();
+ Schema schema = schemaDefault();
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+ RowType writeType = schema.rowType().project(Arrays.asList("f0",
"f1"));
+
+ // Write three commits through normal path and verify limit pushdown
can reduce scanned
+ // files.
+ for (int i = 0; i < 30; i++) {
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(writeType)) {
+ write.write(GenericRow.of(i * 2 + 1,
BinaryString.fromString("v" + (i * 2 + 1))));
+ write.write(GenericRow.of(i * 2 + 2,
BinaryString.fromString("v" + (i * 2 + 2))));
+
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ for (int i = 0; i < 2; i++) {
+ try (BatchTableWrite write =
+ builder.newWrite().withWriteType(writeType.project("f1")))
{
+ write.write(GenericRow.of(BinaryString.fromString("v" + (i * 2
+ 1))));
+ write.write(GenericRow.of(BinaryString.fromString("v" + (i * 2
+ 2))));
+
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitMessages = write.prepareCommit();
+ setFirstRowId(commitMessages, 0L);
+ commit.commit(commitMessages);
+ }
+ }
+
+ ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+ TableScan.Plan fullPlan = readBuilder.newScan().plan();
+ TableScan.Plan limitPlan = readBuilder.withLimit(3).newScan().plan();
+
+ int fullFiles =
+ fullPlan.splits().stream()
+ .map(split -> (DataSplit) split)
+ .mapToInt(split -> split.dataFiles().size())
+ .sum();
+ int limitedFiles =
+ limitPlan.splits().stream()
+ .map(split -> (DataSplit) split)
+ .mapToInt(split -> split.dataFiles().size())
+ .sum();
+
+ assertThat(fullFiles).isEqualTo(32);
+ assertThat(limitedFiles).isEqualTo(4);
+ assertThat(limitedFiles).isLessThan(fullFiles);
+ }
+
+ @Test
+ public void testLimitPushDownWithFilterShouldNotEarlyStop() throws
Exception {
+ createTableDefault();
+ Schema schema = schemaDefault();
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+ RowType writeType = schema.rowType().project(Arrays.asList("f0",
"f1"));
+
+ // Three manifests with different values on f1, only the last one
matches filter.
+ for (int i = 0; i < 3; i++) {
+ String value = i == 0 ? "a" : (i == 1 ? "b" : "c");
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(writeType)) {
+ write.write(GenericRow.of(i * 2 + 1,
BinaryString.fromString(value)));
+ write.write(GenericRow.of(i * 2 + 2,
BinaryString.fromString(value)));
+
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(schema.rowType());
+ Predicate predicate = predicateBuilder.equal(1,
BinaryString.fromString("c"));
+
+ TableScan.Plan plan =
readBuilder.withFilter(predicate).withLimit(1).newScan().plan();
+ assertThat(plan.splits().isEmpty()).isFalse();
+
+ RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(plan);
+ AtomicInteger rowCount = new AtomicInteger(0);
+ reader.forEachRemaining(
+ row -> {
+ assertThat(row.getString(1).toString()).isEqualTo("c");
+ rowCount.incrementAndGet();
+ });
+ assertThat(rowCount.get()).isGreaterThan(0);
+ }
+
@Test
public void testWithRowIdsFilterManifestEntries() throws Exception {
innerTestWithRowIds(true);