This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2597463dbf [core] Support limit pushdown to the DataFile (#6105)
2597463dbf is described below

commit 2597463dbf010bb140daf0ecb5e332c89245c1ce
Author: Tan-JiaLiang <tanjialiang1...@gmail.com>
AuthorDate: Wed Aug 20 17:44:09 2025 +0800

    [core] Support limit pushdown to the DataFile (#6105)
---
 .../bitmap/RangeBitmapIndexPushDownBenchmark.java  | 36 ++++++++++++++
 .../paimon/fileindex/bitmap/BitmapIndexResult.java |  4 ++
 .../org/apache/paimon/io/FileIndexEvaluator.java   | 21 ++++++---
 .../paimon/io/KeyValueFileReaderFactory.java       |  2 +-
 .../paimon/operation/DataEvolutionSplitRead.java   |  1 +
 .../apache/paimon/operation/RawFileSplitRead.java  | 11 ++++-
 .../org/apache/paimon/operation/SplitRead.java     |  4 ++
 .../paimon/table/source/AppendTableRead.java       |  9 ++++
 .../apache/paimon/table/source/InnerTableRead.java |  4 ++
 .../paimon/table/source/KeyValueTableRead.java     | 11 +++++
 .../paimon/table/source/ReadBuilderImpl.java       |  3 ++
 .../apache/paimon/utils/FormatReaderMapping.java   | 18 +++++--
 .../paimon/table/AppendOnlySimpleTableTest.java    | 54 +++++++++++++++++++++
 .../paimon/table/PrimaryKeySimpleTableTest.java    | 55 ++++++++++++++++++++++
 .../paimon/utils/FormatReaderMappingTest.java      |  1 +
 .../paimon/spark/sql/PaimonPushDownTestBase.scala  |  1 +
 16 files changed, 223 insertions(+), 12 deletions(-)

diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
index fa0600911a..77c88afe56 100644
--- 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
@@ -55,6 +55,7 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -95,6 +96,41 @@ public class RangeBitmapIndexPushDownBenchmark {
         }
     }
 
+    @Test
+    public void testLimitPushDown() throws Exception {
+        Random random = new Random();
+        for (int bound : BOUNDS) {
+            Table table = prepareData(bound, parquet(), "parquet_" + bound);
+            Benchmark benchmark =
+                    new Benchmark("limit", ROW_COUNT)
+                            .setNumWarmupIters(1)
+                            .setOutputPerIteration(false);
+            int limit = random.nextInt(Math.min(bound, 1000));
+            benchmark.addCase(
+                    bound + "-" + limit,
+                    1,
+                    () -> {
+                        List<Split> splits =
+                                
table.newReadBuilder().withLimit(limit).newScan().plan().splits();
+                        AtomicLong readCount = new AtomicLong(0);
+                        try {
+                            for (Split split : splits) {
+                                RecordReader<InternalRow> reader =
+                                        table.newReadBuilder()
+                                                .withLimit(limit)
+                                                .newRead()
+                                                .createReader(split);
+                                reader.forEachRemaining(row -> 
readCount.incrementAndGet());
+                                reader.close();
+                            }
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+            benchmark.run();
+        }
+    }
+
     private Options parquet() {
         Options options = new Options();
         options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_PARQUET);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
index 96e6fe8284..fd4991bdf7 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
@@ -59,6 +59,10 @@ public class BitmapIndexResult extends 
LazyField<RoaringBitmap32> implements Fil
         return new BitmapIndexResult(() -> RoaringBitmap32.andNot(get(), 
deletion));
     }
 
+    public FileIndexResult limit(int limit) {
+        return new BitmapIndexResult(() -> get().limit(limit));
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
index 457212ddcb..1b4f3d1776 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
@@ -47,26 +47,33 @@ public class FileIndexEvaluator {
             TableSchema dataSchema,
             List<Predicate> dataFilter,
             @Nullable TopN topN,
+            @Nullable Integer limit,
             DataFilePathFactory dataFilePathFactory,
             DataFileMeta file,
             @Nullable DeletionVector deletionVector)
             throws IOException {
-        if (isNullOrEmpty(dataFilter) && topN == null) {
+        if (isNullOrEmpty(dataFilter) && topN == null && limit == null) {
             return FileIndexResult.REMAIN;
         }
 
+        FileIndexResult selection =
+                new BitmapIndexResult(() -> RoaringBitmap32.bitmapOfRange(0, 
file.rowCount()));
+        if (deletionVector instanceof BitmapDeletionVector) {
+            RoaringBitmap32 deletion = ((BitmapDeletionVector) 
deletionVector).get();
+            selection = ((BitmapIndexResult) selection).andNot(deletion);
+        }
+
+        // for now, limit can not work with other predicates.
+        if (isNullOrEmpty(dataFilter) && topN == null && limit != null) {
+            return ((BitmapIndexResult) selection).limit(limit);
+        }
+
         try (FileIndexPredicate predicate =
                 createFileIndexPredicate(fileIO, dataSchema, 
dataFilePathFactory, file)) {
             if (predicate == null) {
                 return FileIndexResult.REMAIN;
             }
 
-            FileIndexResult selection =
-                    new BitmapIndexResult(() -> 
RoaringBitmap32.bitmapOfRange(0, file.rowCount()));
-            if (deletionVector instanceof BitmapDeletionVector) {
-                RoaringBitmap32 deletion = ((BitmapDeletionVector) 
deletionVector).get();
-                selection = ((BitmapIndexResult) selection).andNot(deletion);
-            }
             FileIndexResult result;
             if (!isNullOrEmpty(dataFilter)) {
                 Predicate filter = PredicateBuilder.and(dataFilter.toArray(new 
Predicate[0]));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 376c45b5dc..0b28bc0b4e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -272,7 +272,7 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                     finalReadKeyType,
                     readValueType,
                     new FormatReaderMapping.Builder(
-                            formatDiscover, readTableFields, fieldsExtractor, 
filters, null),
+                            formatDiscover, readTableFields, fieldsExtractor, 
filters, null, null),
                     pathFactory.createDataFilePathFactory(partition, bucket),
                     options.fileReaderAsyncThreshold().getBytes(),
                     partition,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index acd0b39871..46bfb1a68b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -126,6 +126,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                         readRowType.getFields(),
                         schema -> 
rowTypeWithRowLineage(schema.logicalRowType(), true).getFields(),
                         null,
+                        null,
                         null);
 
         List<List<DataFileMeta>> splitByRowId = 
DataEvolutionSplitGenerator.split(files);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 1c1249978a..610a0c0a2e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -83,6 +83,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
     private RowType readRowType;
     @Nullable private List<Predicate> filters;
     @Nullable private TopN topN;
+    @Nullable private Integer limit;
 
     public RawFileSplitRead(
             FileIO fileIO,
@@ -134,6 +135,12 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
         return this;
     }
 
+    @Override
+    public SplitRead<InternalRow> withLimit(@Nullable Integer limit) {
+        this.limit = limit;
+        return this;
+    }
+
     @Override
     public RecordReader<InternalRow> createReader(DataSplit split) throws 
IOException {
         if (!split.beforeFiles().isEmpty()) {
@@ -172,7 +179,8 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
                             return schema.fields();
                         },
                         filters,
-                        topN);
+                        topN,
+                        limit);
 
         for (DataFileMeta file : files) {
             suppliers.add(
@@ -230,6 +238,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
                             formatReaderMapping.getDataSchema(),
                             formatReaderMapping.getDataFilters(),
                             formatReaderMapping.getTopN(),
+                            formatReaderMapping.getLimit(),
                             dataFilePathFactory,
                             file,
                             deletionVector);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
index e4a8ab5f3b..d646375ef6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
@@ -49,6 +49,10 @@ public interface SplitRead<T> {
         return this;
     }
 
+    default SplitRead<T> withLimit(@Nullable Integer limit) {
+        return this;
+    }
+
     /** Create a {@link RecordReader} from split. */
     RecordReader<T> createReader(DataSplit split) throws IOException;
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
index cdb66483f5..8feb177db5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
@@ -47,6 +47,7 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
     @Nullable private RowType readType = null;
     private Predicate predicate = null;
     private TopN topN = null;
+    private Integer limit = null;
 
     public AppendTableRead(
             List<Function<SplitReadConfig, SplitReadProvider>> 
providerFactories,
@@ -74,6 +75,7 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
         }
         read.withFilter(predicate);
         read.withTopN(topN);
+        read.withLimit(limit);
     }
 
     @Override
@@ -96,6 +98,13 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
         return this;
     }
 
+    @Override
+    public InnerTableRead withLimit(int limit) {
+        initialized().forEach(r -> r.withLimit(limit));
+        this.limit = limit;
+        return this;
+    }
+
     @Override
     public RecordReader<InternalRow> reader(Split split) throws IOException {
         DataSplit dataSplit = (DataSplit) split;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
index 1e34e911ae..b4da78ef6f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
@@ -55,6 +55,10 @@ public interface InnerTableRead extends TableRead {
         return this;
     }
 
+    default InnerTableRead withLimit(int limit) {
+        return this;
+    }
+
     default InnerTableRead forceKeepDelete() {
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index 5df41399cb..ed67e73380 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -56,6 +56,7 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead {
     private Predicate predicate = null;
     private IOManager ioManager = null;
     @Nullable private TopN topN = null;
+    @Nullable private Integer limit = null;
 
     public KeyValueTableRead(
             Supplier<MergeFileSplitRead> mergeReadSupplier,
@@ -91,6 +92,9 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead {
         if (topN != null) {
             read = read.withTopN(topN);
         }
+        if (limit != null) {
+            read = read.withLimit(limit);
+        }
         read.withFilter(predicate).withIOManager(ioManager);
     }
 
@@ -121,6 +125,13 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead {
         return this;
     }
 
+    @Override
+    public InnerTableRead withLimit(int limit) {
+        initialized().forEach(r -> r.withLimit(limit));
+        this.limit = limit;
+        return this;
+    }
+
     @Override
     public TableRead withIOManager(IOManager ioManager) {
         initialized().forEach(r -> r.withIOManager(ioManager));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 5d529aa41d..e57a7b6382 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -214,6 +214,9 @@ public class ReadBuilderImpl implements ReadBuilder {
         if (topN != null) {
             read.withTopN(topN);
         }
+        if (limit != null) {
+            read.withLimit(limit);
+        }
         return read;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
index 4f867b5ac1..590abe1f7c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
@@ -64,6 +64,7 @@ public class FormatReaderMapping {
     private final List<Predicate> dataFilters;
     private final Map<String, Integer> systemFields;
     @Nullable private final TopN topN;
+    @Nullable private final Integer limit;
 
     public FormatReaderMapping(
             @Nullable int[] indexMapping,
@@ -74,7 +75,8 @@ public class FormatReaderMapping {
             TableSchema dataSchema,
             List<Predicate> dataFilters,
             Map<String, Integer> systemFields,
-            @Nullable TopN topN) {
+            @Nullable TopN topN,
+            @Nullable Integer limit) {
         this.indexMapping = combine(indexMapping, trimmedKeyMapping);
         this.castMapping = castMapping;
         this.readerFactory = readerFactory;
@@ -83,6 +85,7 @@ public class FormatReaderMapping {
         this.dataFilters = dataFilters;
         this.systemFields = systemFields;
         this.topN = topN;
+        this.limit = limit;
     }
 
     private int[] combine(@Nullable int[] indexMapping, @Nullable int[] 
trimmedKeyMapping) {
@@ -141,6 +144,11 @@ public class FormatReaderMapping {
         return topN;
     }
 
+    @Nullable
+    public Integer getLimit() {
+        return limit;
+    }
+
     /** Builder for {@link FormatReaderMapping}. */
     public static class Builder {
 
@@ -149,18 +157,21 @@ public class FormatReaderMapping {
         private final Function<TableSchema, List<DataField>> fieldsExtractor;
         @Nullable private final List<Predicate> filters;
         @Nullable private final TopN topN;
+        @Nullable private final Integer limit;
 
         public Builder(
                 FileFormatDiscover formatDiscover,
                 List<DataField> readFields,
                 Function<TableSchema, List<DataField>> fieldsExtractor,
                 @Nullable List<Predicate> filters,
-                @Nullable TopN topN) {
+                @Nullable TopN topN,
+                @Nullable Integer limit) {
             this.formatDiscover = formatDiscover;
             this.readFields = readFields;
             this.fieldsExtractor = fieldsExtractor;
             this.filters = filters;
             this.topN = topN;
+            this.limit = limit;
         }
 
         /**
@@ -223,7 +234,8 @@ public class FormatReaderMapping {
                     dataSchema,
                     readFilters,
                     systemFields,
-                    evolutionTopN(tableSchema, dataSchema));
+                    evolutionTopN(tableSchema, dataSchema),
+                    limit);
         }
 
         @Nullable
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index 024e6320b8..f8056cc492 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -36,6 +36,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Equal;
 import org.apache.paimon.predicate.FieldRef;
@@ -68,6 +69,7 @@ import org.apache.paimon.utils.RoaringBitmap32;
 
 import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat;
 
+import org.apache.commons.math3.random.RandomDataGenerator;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -96,6 +98,7 @@ import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET;
 import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
 import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE;
+import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
 import static org.apache.paimon.CoreOptions.WRITE_ONLY;
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -946,6 +949,57 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
         }
     }
 
+    @Test
+    public void testLimitPushDown() throws Exception {
+        RowType rowType = RowType.builder().field("id", 
DataTypes.INT()).build();
+        Consumer<Options> configure =
+                options -> {
+                    options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
+                    options.set(WRITE_ONLY, true);
+                    options.set(SOURCE_SPLIT_TARGET_SIZE, 
MemorySize.ofBytes(1));
+                    options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576");
+                    
options.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
+                    options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, 
"300");
+                };
+        // in unaware-bucket mode, we split files into splits all the time
+        FileStoreTable table = createUnawareBucketFileStoreTable(rowType, 
configure);
+
+        int rowCount = 10000;
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        for (int i = 0; i < rowCount; i++) {
+            write.write(GenericRow.of(i));
+        }
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        for (int i = 0; i < rowCount; i++) {
+            write.write(GenericRow.of(i));
+        }
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        for (int i = 0; i < rowCount; i++) {
+            write.write(GenericRow.of(i));
+        }
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        write.close();
+        commit.close();
+
+        // test limit push down
+        {
+            int limit = new RandomDataGenerator().nextInt(1, 1000);
+            TableScan.Plan plan = table.newScan().withLimit(limit).plan();
+            assertThat(plan.splits()).hasSize(1);
+
+            RecordReader<InternalRow> reader =
+                    
table.newRead().withLimit(limit).createReader(plan.splits());
+            AtomicInteger cnt = new AtomicInteger(0);
+            reader.forEachRemaining(row -> cnt.incrementAndGet());
+            assertThat(cnt.get()).isEqualTo(limit);
+            reader.close();
+        }
+    }
+
     @Test
     public void testWithShardAppendTable() throws Exception {
         FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, 
-1));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 1e4b8b185b..d4f1484230 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -86,6 +86,7 @@ import org.apache.paimon.utils.RoaringBitmap32;
 
 import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat;
 
+import org.apache.commons.math3.random.RandomDataGenerator;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -1303,6 +1304,60 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
         }
     }
 
+    @Test
+    public void testLimitPushDownInDeletionVectorMode() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(BUCKET, 2);
+                            conf.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
+                            conf.set(DELETION_VECTORS_ENABLED, true);
+                            conf.set(SOURCE_SPLIT_TARGET_SIZE, 
MemorySize.ofBytes(1));
+                            conf.set(ParquetOutputFormat.BLOCK_SIZE, "524288");
+                            
conf.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
+                            conf.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, 
"300");
+                        });
+
+        int rowCount = 10000;
+        StreamTableWrite write =
+                table.newWrite(commitUser).withIOManager(new 
IOManagerImpl(tempDir.toString()));
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        // append
+        for (int i = 0; i < rowCount; i++) {
+            write.write(rowDataWithKind(RowKind.INSERT, 1, i, (long) i));
+        }
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        // delete [0, 2000]
+        for (int i = 0; i < 2000; i++) {
+            write.write(rowDataWithKind(RowKind.DELETE, 1, i, (long) i));
+        }
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        // delete (rowCount - 2000, rowCount)
+        for (int i = rowCount - 2000; i < rowCount; i++) {
+            write.write(rowDataWithKind(RowKind.DELETE, 1, i, (long) i));
+        }
+        commit.commit(2, write.prepareCommit(true, 2));
+        write.close();
+        commit.close();
+
+        // test limit push down
+        {
+            int limit = new RandomDataGenerator().nextInt(1, 1000);
+            TableScan.Plan plan = table.newScan().withLimit(limit).plan();
+            assertThat(plan.splits()).hasSize(1);
+
+            RecordReader<InternalRow> reader =
+                    
table.newRead().withLimit(limit).createReader(plan.splits());
+            AtomicInteger cnt = new AtomicInteger(0);
+            reader.forEachRemaining(row -> cnt.incrementAndGet());
+            assertThat(cnt.get()).isEqualTo(limit);
+            reader.close();
+        }
+    }
+
     @Test
     public void testWithShardFirstRow() throws Exception {
         FileStoreTable table =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
index af7fba5401..7b2ad9898d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
@@ -136,6 +136,7 @@ public class FormatReaderMappingTest {
                         null,
                         null,
                         Collections.emptyMap(),
+                        null,
                         null);
 
         Assertions.assertThat(formatReaderMapping.getIndexMapping())
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index 9541db60e8..1edc64e55b 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -224,6 +224,7 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
 
               sql("INSERT INTO T SELECT id FROM range (1, 50000)")
               sql("DELETE FROM T WHERE id % 13 = 0")
+              Assertions.assertEquals(100, spark.sql("SELECT * FROM T LIMIT 
100").count())
 
               val withoutLimit = 
getScanBuilder().build().asInstanceOf[PaimonScan].getOriginSplits
               assert(withoutLimit.length == 10)

Reply via email to