This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f34bd3c8a [Core] support limit pushdown with pk table (#6914)
7f34bd3c8a is described below

commit 7f34bd3c8aa9f41b8286a496101311ce250a53f0
Author: wangwj <[email protected]>
AuthorDate: Fri Jan 16 14:11:22 2026 +0800

    [Core] support limit pushdown with pk table (#6914)
---
 .../paimon/operation/AbstractFileStoreScan.java    |  12 +-
 .../paimon/operation/AppendOnlyFileStoreScan.java  |  18 +-
 .../paimon/operation/KeyValueFileStoreScan.java    | 187 ++++++++++++--
 .../paimon/table/source/DataTableBatchScan.java    |  11 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |   4 +-
 .../operation/KeyValueFileStoreScanTest.java       | 286 ++++++++++++++++++++-
 .../flink/PrimaryKeyFileStoreTableITCase.java      | 125 +++++++++
 7 files changed, 611 insertions(+), 32 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 284a2ef195..fdefcd0d35 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -46,6 +46,9 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Range;
 import org.apache.paimon.utils.SnapshotManager;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -68,6 +71,8 @@ import static 
org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
 /** Default implementation of {@link FileStoreScan}. */
 public abstract class AbstractFileStoreScan implements FileStoreScan {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractFileStoreScan.class);
+
     private final ManifestsReader manifestsReader;
     private final SnapshotManager snapshotManager;
     private final ManifestFile.Factory manifestFileFactory;
@@ -277,9 +282,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
 
         Iterator<ManifestEntry> iterator = readManifestEntries(manifests, 
false);
-        if (supportsLimitPushManifestEntries()) {
-            iterator = limitPushManifestEntries(iterator);
-        }
 
         List<ManifestEntry> files = ListUtils.toList(iterator);
         if (postFilterManifestEntriesEnabled()) {
@@ -289,6 +291,10 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         List<ManifestEntry> result = files;
 
         long scanDuration = (System.nanoTime() - started) / 1_000_000;
+        LOG.info(
+                "File store scan plan completed in {} ms. Files size : {}",
+                scanDuration,
+                result.size());
         if (scanMetrics != null) {
             long allDataFiles =
                     manifestsResult.allManifests.stream()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 1892706169..bf7504a5fb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -33,7 +33,9 @@ import org.apache.paimon.utils.SnapshotManager;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -92,9 +94,21 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
     }
 
     @Override
-    protected Iterator<ManifestEntry> 
limitPushManifestEntries(Iterator<ManifestEntry> entries) {
+    protected boolean postFilterManifestEntriesEnabled() {
+        return supportsLimitPushManifestEntries();
+    }
+
+    @Override
+    protected List<ManifestEntry> 
postFilterManifestEntries(List<ManifestEntry> entries) {
         checkArgument(limit != null && limit > 0 && !deletionVectorsEnabled);
-        return new LimitAwareManifestEntryIterator(entries, limit);
+        // Use LimitAwareManifestEntryIterator for limit pushdown
+        Iterator<ManifestEntry> iterator =
+                new LimitAwareManifestEntryIterator(entries.iterator(), limit);
+        List<ManifestEntry> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+        return result;
     }
 
     /** Note: Keep this thread-safe. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index ef6fd1e52b..2ac05b7385 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fileindex.FileIndexPredicate;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FilteredManifestEntry;
@@ -37,16 +38,19 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
@@ -55,6 +59,8 @@ import static 
org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
 /** {@link FileStoreScan} for {@link KeyValueFileStore}. */
 public class KeyValueFileStoreScan extends AbstractFileStoreScan {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(KeyValueFileStoreScan.class);
+
     private final SimpleStatsEvolutions fieldKeyStatsConverters;
     private final SimpleStatsEvolutions fieldValueStatsConverters;
     private final BucketSelectConverter bucketSelectConverter;
@@ -203,29 +209,162 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
         }
     }
 
+    /**
+     * Check if limit pushdown is supported for PK tables.
+     *
+     * <p>Not supported when merge engine is PARTIAL_UPDATE/AGGREGATE (need 
merge) or deletion
+     * vectors are enabled (can't count deleted rows). For 
DEDUPLICATE/FIRST_ROW, per-bucket checks
+     * (no overlapping, no delete rows) are done in 
applyLimitPushdownForBucket.
+     */
+    @Override
+    public boolean supportsLimitPushManifestEntries() {
+        if (mergeEngine == PARTIAL_UPDATE || mergeEngine == AGGREGATE) {
+            return false;
+        }
+
+        return limit != null && limit > 0 && !deletionVectorsEnabled;
+    }
+
+    /**
+     * Apply limit pushdown for a single bucket. Returns files to include, or 
null if unsafe.
+     *
+     * <p>Returns null if files overlap (LSM level 0 or different levels) or 
have delete rows. For
+     * non-overlapping files with no delete rows, accumulates row counts until 
limit is reached.
+     *
+     * @param bucketEntries files in the same bucket
+     * @param limit the limit to apply
+     * @return files to include, or null if we can't safely push down limit
+     */
+    @Nullable
+    private List<ManifestEntry> applyLimitPushdownForBucket(
+            List<ManifestEntry> bucketEntries, long limit) {
+        // Check if this bucket has overlapping files (LSM property)
+        boolean hasOverlapping = !noOverlapping(bucketEntries);
+
+        if (hasOverlapping) {
+            // For buckets with overlapping, we can't safely push down limit 
because files
+            // need to be merged and we can't accurately calculate the merged 
row count.
+            return null;
+        }
+
+        // For buckets without overlapping and with merge engines that don't 
require
+        // merge (DEDUPLICATE or FIRST_ROW), we can safely accumulate row count
+        // and stop when limit is reached, but only if files have no delete 
rows.
+        List<ManifestEntry> result = new ArrayList<>();
+        long accumulatedRowCount = 0;
+
+        for (ManifestEntry entry : bucketEntries) {
+            long fileRowCount = entry.file().rowCount();
+            // Check if file has delete rows - if so, we can't accurately 
calculate
+            // the merged row count, so we need to stop limit pushdown
+            boolean hasDeleteRows =
+                    entry.file().deleteRowCount().map(count -> count > 
0L).orElse(false);
+
+            if (hasDeleteRows) {
+                // If file has delete rows, we can't accurately calculate 
merged row count
+                // without reading the actual data. Can't safely push down 
limit.
+                return null;
+            }
+
+            // File has no delete rows, no overlapping, and merge engine 
doesn't require merge.
+            // Safe to count rows.
+            result.add(entry);
+            accumulatedRowCount += fileRowCount;
+            if (accumulatedRowCount >= limit) {
+                break;
+            }
+        }
+
+        return result;
+    }
+
     @Override
     protected boolean postFilterManifestEntriesEnabled() {
-        return valueFilter != null && scanMode == ScanMode.ALL;
+        return (valueFilter != null && scanMode == ScanMode.ALL)
+                || supportsLimitPushManifestEntries();
     }
 
     @Override
     protected List<ManifestEntry> 
postFilterManifestEntries(List<ManifestEntry> files) {
-        // We group files by bucket here, and filter them by the whole bucket 
filter.
-        // Why do this: because in primary key table, we can't just filter the 
value
-        // by the stat in files (see 
`PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
-        // but we can do this by filter the whole bucket files
-        return files.stream()
-                .collect(
-                        Collectors.groupingBy(
-                                // we use LinkedHashMap to avoid disorder
-                                file -> Pair.of(file.partition(), 
file.bucket()),
-                                LinkedHashMap::new,
-                                Collectors.toList()))
-                .values()
-                .stream()
-                .map(this::doFilterWholeBucketByStats)
-                .flatMap(Collection::stream)
-                .collect(Collectors.toList());
+        long startTime = System.nanoTime();
+        Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> buckets = 
groupByBucket(files);
+
+        // Apply filter if valueFilter is enabled, otherwise use identity 
function
+        Function<List<ManifestEntry>, List<ManifestEntry>> bucketProcessor =
+                (valueFilter != null && scanMode == ScanMode.ALL)
+                        ? this::doFilterWholeBucketByStats
+                        : Function.identity();
+
+        // Apply filter (if enabled) and limit pushdown (if enabled)
+        boolean limitEnabled = supportsLimitPushManifestEntries();
+        List<ManifestEntry> result =
+                applyLimitPushdownToBuckets(buckets, bucketProcessor, 
limitEnabled);
+
+        if (limitEnabled) {
+            long duration = (System.nanoTime() - startTime) / 1_000_000;
+            LOG.info(
+                    "Limit pushdown for PK table completed in {} ms. Limit: 
{}, InputFiles: {}, OutputFiles: {}, "
+                            + "MergeEngine: {}, ScanMode: {}, 
DeletionVectorsEnabled: {}",
+                    duration,
+                    limit,
+                    files.size(),
+                    result.size(),
+                    mergeEngine,
+                    scanMode,
+                    deletionVectorsEnabled);
+        }
+
+        return result;
+    }
+
+    /**
+     * Apply limit pushdown to buckets with an optional bucket processor 
(e.g., filtering).
+     *
+     * <p>This method processes buckets in order, applying the bucket 
processor first, then applying
+     * limit pushdown if enabled. It stops early when the limit is reached.
+     *
+     * @param buckets buckets grouped by (partition, bucket)
+     * @param bucketProcessor processor to apply to each bucket before limit 
pushdown
+     * @return processed entries (filtered and limited if limit is enabled)
+     */
+    private List<ManifestEntry> applyLimitPushdownToBuckets(
+            Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> buckets,
+            Function<List<ManifestEntry>, List<ManifestEntry>> bucketProcessor,
+            boolean limitEnabled) {
+        List<ManifestEntry> result = new ArrayList<>();
+        long accumulatedRowCount = 0;
+
+        for (List<ManifestEntry> bucketEntries : buckets.values()) {
+            // Apply bucket processor (e.g., filtering)
+            List<ManifestEntry> processed = 
bucketProcessor.apply(bucketEntries);
+
+            if (limitEnabled) {
+                // Apply limit pushdown if enabled
+                if (accumulatedRowCount >= limit) {
+                    // Already reached limit, stop processing remaining buckets
+                    break;
+                }
+
+                long remainingLimit = limit - accumulatedRowCount;
+                List<ManifestEntry> processedBucket =
+                        applyLimitPushdownForBucket(processed, remainingLimit);
+                if (processedBucket == null) {
+                    // Can't safely push down limit for this bucket, include 
all processed entries
+                    result.addAll(processed);
+                } else {
+                    result.addAll(processedBucket);
+                    for (ManifestEntry entry : processedBucket) {
+                        long fileRowCount = entry.file().rowCount();
+                        accumulatedRowCount += fileRowCount;
+                    }
+                }
+            } else {
+                // No limit pushdown, just add processed entries
+                result.addAll(processed);
+            }
+        }
+
+        return result;
     }
 
     private List<ManifestEntry> doFilterWholeBucketByStats(List<ManifestEntry> 
entries) {
@@ -316,4 +455,16 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
 
         return true;
     }
+
+    /** Group manifest entries by (partition, bucket) while preserving order. 
*/
+    private Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> groupByBucket(
+            List<ManifestEntry> entries) {
+        return entries.stream()
+                .collect(
+                        Collectors.groupingBy(
+                                // we use LinkedHashMap to avoid disorder
+                                file -> Pair.of(file.partition(), 
file.bucket()),
+                                LinkedHashMap::new,
+                                Collectors.toList()));
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 67c8acbf62..012160f5bd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -31,6 +31,9 @@ import 
org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
 import org.apache.paimon.types.DataType;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -40,6 +43,8 @@ import static 
org.apache.paimon.table.source.PushDownUtils.minmaxAvailable;
 /** {@link TableScan} implementation for batch planning. */
 public class DataTableBatchScan extends AbstractDataTableScan {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataTableBatchScan.class);
+
     private StartingScanner startingScanner;
     private boolean hasNext;
 
@@ -132,6 +137,7 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
         long scannedRowCount = 0;
         SnapshotReader.Plan plan = ((ScannedResult) result).plan();
         List<DataSplit> splits = plan.dataSplits();
+        LOG.info("Applying limit pushdown. Original splits count: {}", 
splits.size());
         if (splits.isEmpty()) {
             return Optional.of(result);
         }
@@ -145,6 +151,11 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
                 if (scannedRowCount >= pushDownLimit) {
                     SnapshotReader.Plan newPlan =
                             new PlanImpl(plan.watermark(), plan.snapshotId(), 
limitedSplits);
+                    LOG.info(
+                            "Limit pushdown applied successfully. Original 
splits: {}, Limited splits: {}, Pushdown limit: {}",
+                            splits.size(),
+                            limitedSplits.size(),
+                            pushDownLimit);
                     return Optional.of(new ScannedResult(newPlan));
                 }
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index e036710e8e..3021eeda3e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -409,9 +409,11 @@ public class SnapshotReaderImpl implements SnapshotReader {
                         isStreaming
                                 ? splitGenerator.splitForStreaming(bucketFiles)
                                 : splitGenerator.splitForBatch(bucketFiles);
+
+                // Calculate bucketPath once per bucket to avoid repeated 
computation
+                String bucketPath = pathFactory.bucketPath(partition, 
bucket).toString();
                 for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
                     List<DataFileMeta> dataFiles = splitGroup.files;
-                    String bucketPath = pathFactory.bucketPath(partition, 
bucket).toString();
                     builder.withDataFiles(dataFiles)
                             .rawConvertible(splitGroup.rawConvertible)
                             .withBucketPath(bucketPath);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
index 4f3d5c1c24..f975cf570d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.TestFileStore;
@@ -30,9 +31,10 @@ import 
org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.SnapshotManager;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -59,7 +61,6 @@ public class KeyValueFileStoreScanTest {
     private TestKeyValueGenerator gen;
     @TempDir java.nio.file.Path tempDir;
     private TestFileStore store;
-    private SnapshotManager snapshotManager;
 
     @BeforeEach
     public void beforeEach() throws Exception {
@@ -76,7 +77,6 @@ public class KeyValueFileStoreScanTest {
                                 DeduplicateMergeFunction.factory(),
                                 null)
                         .build();
-        snapshotManager = store.snapshotManager();
 
         SchemaManager schemaManager =
                 new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toUri()));
@@ -271,6 +271,276 @@ public class KeyValueFileStoreScanTest {
         }
     }
 
+    @Test
+    public void testLimitPushdownWithoutValueFilter() throws Exception {
+        // Write multiple files to test limit pushdown
+        List<KeyValue> data1 = generateData(50);
+        writeData(data1);
+        List<KeyValue> data2 = generateData(50);
+        writeData(data2);
+        List<KeyValue> data3 = generateData(50);
+        Snapshot snapshot = writeData(data3);
+
+        // Without limit, should read all files
+        KeyValueFileStoreScan scanWithoutLimit = store.newScan();
+        scanWithoutLimit.withSnapshot(snapshot.id());
+        List<ManifestEntry> filesWithoutLimit = 
scanWithoutLimit.plan().files();
+        int totalFiles = filesWithoutLimit.size();
+        assertThat(totalFiles).isGreaterThan(0);
+
+        // With limit, should read fewer files (limit pushdown should work)
+        KeyValueFileStoreScan scanWithLimit = store.newScan();
+        scanWithLimit.withSnapshot(snapshot.id()).withLimit(10);
+        List<ManifestEntry> filesWithLimit = scanWithLimit.plan().files();
+        // Limit pushdown should reduce the number of files read
+        assertThat(filesWithLimit.size()).isLessThanOrEqualTo(totalFiles);
+        assertThat(filesWithLimit.size()).isGreaterThan(0);
+    }
+
+    @Test
+    public void testLimitPushdownWithValueFilter() throws Exception {
+        // Write data with different item values
+        List<KeyValue> data1 = generateData(50, 0, 100L);
+        writeData(data1);
+        List<KeyValue> data2 = generateData(50, 0, 200L);
+        writeData(data2);
+        List<KeyValue> data3 = generateData(50, 0, 300L);
+        Snapshot snapshot = writeData(data3);
+
+        // Without valueFilter, limit pushdown should work
+        KeyValueFileStoreScan scanWithoutFilter = store.newScan();
+        scanWithoutFilter.withSnapshot(snapshot.id()).withLimit(10);
+        List<ManifestEntry> filesWithoutFilter = 
scanWithoutFilter.plan().files();
+        int totalFilesWithoutFilter = filesWithoutFilter.size();
+        assertThat(totalFilesWithoutFilter).isGreaterThan(0);
+
+        // With valueFilter, limit pushdown should still work.
+        KeyValueFileStoreScan scanWithFilter = store.newScan();
+        scanWithFilter.withSnapshot(snapshot.id());
+        scanWithFilter.withValueFilter(
+                new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE)
+                        .between(4, 100L, 200L));
+        scanWithFilter.withLimit(10);
+        List<ManifestEntry> filesWithFilter = scanWithFilter.plan().files();
+
+        // Limit pushdown should work with valueFilter
+        // The number of files should be less than or equal to the total files 
after filtering
+        assertThat(filesWithFilter.size()).isGreaterThan(0);
+        
assertThat(filesWithFilter.size()).isLessThanOrEqualTo(totalFilesWithoutFilter);
+    }
+
+    @Test
+    public void testLimitPushdownWithKeyFilter() throws Exception {
+        // Write data with different shop IDs
+        List<KeyValue> data = generateData(200);
+        Snapshot snapshot = writeData(data);
+
+        // With keyFilter, limit pushdown should still work (keyFilter doesn't 
affect limit
+        // pushdown)
+        KeyValueFileStoreScan scan = store.newScan();
+        scan.withSnapshot(snapshot.id());
+        scan.withKeyFilter(
+                new PredicateBuilder(RowType.of(new IntType(false)))
+                        .equal(0, data.get(0).key().getInt(0)));
+        scan.withLimit(5);
+        List<ManifestEntry> files = scan.plan().files();
+        assertThat(files.size()).isGreaterThan(0);
+    }
+
+    @Test
+    public void testLimitPushdownMultipleBuckets() throws Exception {
+        // Write data to multiple buckets to test limit pushdown across buckets
+        List<KeyValue> data1 = generateData(30);
+        writeData(data1);
+        List<KeyValue> data2 = generateData(30);
+        writeData(data2);
+        List<KeyValue> data3 = generateData(30);
+        Snapshot snapshot = writeData(data3);
+
+        // Without limit, should read all files
+        KeyValueFileStoreScan scanWithoutLimit = store.newScan();
+        scanWithoutLimit.withSnapshot(snapshot.id());
+        List<ManifestEntry> filesWithoutLimit = 
scanWithoutLimit.plan().files();
+        int totalFiles = filesWithoutLimit.size();
+        assertThat(totalFiles).isGreaterThan(0);
+
+        // With limit, should read fewer files (limit pushdown should work 
across buckets)
+        KeyValueFileStoreScan scanWithLimit = store.newScan();
+        scanWithLimit.withSnapshot(snapshot.id()).withLimit(20);
+        List<ManifestEntry> filesWithLimit = scanWithLimit.plan().files();
+        // Limit pushdown should reduce the number of files read
+        assertThat(filesWithLimit.size()).isLessThanOrEqualTo(totalFiles);
+        assertThat(filesWithLimit.size()).isGreaterThan(0);
+    }
+
+    @Test
+    public void testLimitPushdownWithSmallLimit() throws Exception {
+        // Test limit pushdown with a very small limit
+        List<KeyValue> data1 = generateData(100);
+        writeData(data1);
+        List<KeyValue> data2 = generateData(100);
+        writeData(data2);
+        Snapshot snapshot = writeData(data2);
+
+        KeyValueFileStoreScan scan = store.newScan();
+        scan.withSnapshot(snapshot.id()).withLimit(1);
+        List<ManifestEntry> files = scan.plan().files();
+        // Should read at least one file, but fewer than all files
+        assertThat(files.size()).isGreaterThan(0);
+    }
+
+    @Test
+    public void testLimitPushdownWithLargeLimit() throws Exception {
+        // Test limit pushdown with a large limit (larger than total rows)
+        List<KeyValue> data1 = generateData(50);
+        writeData(data1);
+        List<KeyValue> data2 = generateData(50);
+        Snapshot snapshot = writeData(data2);
+
+        KeyValueFileStoreScan scanWithoutLimit = store.newScan();
+        scanWithoutLimit.withSnapshot(snapshot.id());
+        List<ManifestEntry> filesWithoutLimit = 
scanWithoutLimit.plan().files();
+        int totalFiles = filesWithoutLimit.size();
+
+        KeyValueFileStoreScan scanWithLimit = store.newScan();
+        scanWithLimit.withSnapshot(snapshot.id()).withLimit(10000);
+        List<ManifestEntry> filesWithLimit = scanWithLimit.plan().files();
+        // With a large limit, should read all files
+        assertThat(filesWithLimit.size()).isEqualTo(totalFiles);
+    }
+
+    @Test
+    public void testLimitPushdownWithPartialUpdateMergeEngine() throws 
Exception {
+        // Test that limit pushdown is disabled for PARTIAL_UPDATE merge engine
+        // Create a store with PARTIAL_UPDATE merge engine by setting it in 
schema options
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toUri()));
+        Schema schema =
+                new Schema(
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+                        
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+                        TestKeyValueGenerator.getPrimaryKeys(
+                                
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+                        
Collections.singletonMap(CoreOptions.MERGE_ENGINE.key(), "partial-update"),
+                        null);
+        TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager, 
schema);
+
+        TestFileStore storePartialUpdate =
+                new TestFileStore.Builder(
+                                "avro",
+                                tempDir.toString(),
+                                NUM_BUCKETS,
+                                TestKeyValueGenerator.DEFAULT_PART_TYPE,
+                                TestKeyValueGenerator.KEY_TYPE,
+                                TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                                
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+                                DeduplicateMergeFunction.factory(),
+                                tableSchema)
+                        .build();
+
+        List<KeyValue> data1 = generateData(50);
+        writeData(data1, storePartialUpdate);
+        List<KeyValue> data2 = generateData(50);
+        Snapshot snapshot = writeData(data2, storePartialUpdate);
+
+        KeyValueFileStoreScan scan = storePartialUpdate.newScan();
+        scan.withSnapshot(snapshot.id()).withLimit(10);
+        // supportsLimitPushManifestEntries should return false for 
PARTIAL_UPDATE
+        assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+
+        // Should read all files since limit pushdown is disabled
+        KeyValueFileStoreScan scanWithoutLimit = storePartialUpdate.newScan();
+        scanWithoutLimit.withSnapshot(snapshot.id());
+        List<ManifestEntry> filesWithoutLimit = 
scanWithoutLimit.plan().files();
+        int totalFiles = filesWithoutLimit.size();
+
+        List<ManifestEntry> filesWithLimit = scan.plan().files();
+        assertThat(filesWithLimit.size()).isEqualTo(totalFiles);
+    }
+
+    @Test
+    public void testLimitPushdownWithAggregateMergeEngine() throws Exception {
+        // Test that limit pushdown is disabled for AGGREGATE merge engine
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toUri()));
+        Schema schema =
+                new Schema(
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+                        
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+                        TestKeyValueGenerator.getPrimaryKeys(
+                                
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+                        
Collections.singletonMap(CoreOptions.MERGE_ENGINE.key(), "aggregation"),
+                        null);
+        TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager, 
schema);
+
+        TestFileStore storeAggregate =
+                new TestFileStore.Builder(
+                                "avro",
+                                tempDir.toString(),
+                                NUM_BUCKETS,
+                                TestKeyValueGenerator.DEFAULT_PART_TYPE,
+                                TestKeyValueGenerator.KEY_TYPE,
+                                TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                                
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+                                DeduplicateMergeFunction.factory(),
+                                tableSchema)
+                        .build();
+
+        List<KeyValue> data1 = generateData(50);
+        writeData(data1, storeAggregate);
+        List<KeyValue> data2 = generateData(50);
+        Snapshot snapshot = writeData(data2, storeAggregate);
+
+        KeyValueFileStoreScan scan = storeAggregate.newScan();
+        scan.withSnapshot(snapshot.id()).withLimit(10);
+        // supportsLimitPushManifestEntries should return false for AGGREGATE
+        assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+
+        // Should read all files since limit pushdown is disabled
+        KeyValueFileStoreScan scanWithoutLimit = storeAggregate.newScan();
+        scanWithoutLimit.withSnapshot(snapshot.id());
+        List<ManifestEntry> filesWithoutLimit = 
scanWithoutLimit.plan().files();
+        int totalFiles = filesWithoutLimit.size();
+
+        List<ManifestEntry> filesWithLimit = scan.plan().files();
+        assertThat(filesWithLimit.size()).isEqualTo(totalFiles);
+    }
+
+    @Test
+    public void testLimitPushdownWithDeletionVectors() throws Exception {
+        // Test that limit pushdown is disabled when deletion vectors are 
enabled
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toUri()));
+        Schema schema =
+                new Schema(
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+                        
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+                        TestKeyValueGenerator.getPrimaryKeys(
+                                
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+                        Collections.singletonMap(
+                                CoreOptions.DELETION_VECTORS_ENABLED.key(), 
"true"),
+                        null);
+        TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager, 
schema);
+
+        TestFileStore storeWithDV =
+                new TestFileStore.Builder(
+                                "avro",
+                                tempDir.toString(),
+                                NUM_BUCKETS,
+                                TestKeyValueGenerator.DEFAULT_PART_TYPE,
+                                TestKeyValueGenerator.KEY_TYPE,
+                                TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                                
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+                                DeduplicateMergeFunction.factory(),
+                                tableSchema)
+                        .build();
+
+        KeyValueFileStoreScan scan = storeWithDV.newScan();
+        scan.withLimit(10);
+        // supportsLimitPushManifestEntries should return false when deletion 
vectors are enabled
+        assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+    }
+
     private void runTestExactMatch(
             FileStoreScan scan, Long expectedSnapshotId, Map<BinaryRow, 
BinaryRow> expected)
             throws Exception {
@@ -307,10 +577,6 @@ public class KeyValueFileStoreScanTest {
         return data;
     }
 
-    private List<KeyValue> generateData(int numRecords, int hr) {
-        return generateData(numRecords, hr, null);
-    }
-
     private List<KeyValue> generateData(int numRecords, int hr, Long itemId) {
         List<KeyValue> data = new ArrayList<>();
         for (int i = 0; i < numRecords; i++) {
@@ -320,7 +586,11 @@ public class KeyValueFileStoreScanTest {
     }
 
     private Snapshot writeData(List<KeyValue> kvs) throws Exception {
-        List<Snapshot> snapshots = store.commitData(kvs, gen::getPartition, 
this::getBucket);
+        return writeData(kvs, store);
+    }
+
+    private Snapshot writeData(List<KeyValue> kvs, TestFileStore testStore) 
throws Exception {
+        List<Snapshot> snapshots = testStore.commitData(kvs, 
gen::getPartition, this::getBucket);
         return snapshots.get(snapshots.size() - 1);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index a7fb814f4a..d98f277a55 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -1598,4 +1598,129 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
             }
         }
     }
+
+    @Test
+    public void testLimitPushdownWithTimeFilter() throws Exception {
+        // This test verifies that limit pushdown works correctly when 
valueFilter
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+        tEnv.executeSql("USE CATALOG testCatalog");
+        tEnv.executeSql(
+                "CREATE TABLE T ("
+                        + "id INT, "
+                        + "name STRING, "
+                        + "ts TIMESTAMP(3), "
+                        + "PRIMARY KEY (id) NOT ENFORCED"
+                        + ")");
+
+        // Insert data with different timestamps
+        tEnv.executeSql(
+                        "INSERT INTO T VALUES "
+                                + "(1, 'a', TIMESTAMP '2024-01-01 10:00:00'), "
+                                + "(2, 'b', TIMESTAMP '2024-01-01 11:00:00'), "
+                                + "(3, 'c', TIMESTAMP '2024-01-01 12:00:00'), "
+                                + "(4, 'd', TIMESTAMP '2024-01-01 13:00:00'), "
+                                + "(5, 'e', TIMESTAMP '2024-01-01 14:00:00')")
+                .await();
+
+        // Without filter, limit pushdown should work
+        try (CloseableIterator<Row> iter = tEnv.executeSql("SELECT * FROM T 
LIMIT 3").collect()) {
+            List<Row> allRows = new ArrayList<>();
+            iter.forEachRemaining(allRows::add);
+            assertThat(allRows.size()).isEqualTo(3);
+        }
+
+        // Test limit pushdown with time filter (4 rows match, LIMIT 3)
+        try (CloseableIterator<Row> iter =
+                tEnv.executeSql(
+                                "SELECT * FROM T WHERE ts >= TIMESTAMP 
'2024-01-01 11:00:00' LIMIT 3")
+                        .collect()) {
+            List<Row> filteredRows = new ArrayList<>();
+            iter.forEachRemaining(filteredRows::add);
+            assertThat(filteredRows.size()).isGreaterThanOrEqualTo(3);
+            assertThat(filteredRows.size()).isLessThanOrEqualTo(4);
+            for (Row row : filteredRows) {
+                java.time.LocalDateTime ts = (java.time.LocalDateTime) 
row.getField(2);
+                java.time.LocalDateTime filterTime =
+                        java.time.LocalDateTime.parse("2024-01-01T11:00:00");
+                assertThat(ts).isAfterOrEqualTo(filterTime);
+            }
+        }
+
+        // Test with more restrictive filter (3 rows match, LIMIT 2)
+        try (CloseableIterator<Row> iter =
+                tEnv.executeSql(
+                                "SELECT * FROM T WHERE ts >= TIMESTAMP 
'2024-01-01 12:00:00' LIMIT 2")
+                        .collect()) {
+            List<Row> filteredRows2 = new ArrayList<>();
+            iter.forEachRemaining(filteredRows2::add);
+            assertThat(filteredRows2.size()).isGreaterThanOrEqualTo(2);
+            assertThat(filteredRows2.size()).isLessThanOrEqualTo(3);
+            for (Row row : filteredRows2) {
+                java.time.LocalDateTime ts = (java.time.LocalDateTime) 
row.getField(2);
+                java.time.LocalDateTime filterTime =
+                        java.time.LocalDateTime.parse("2024-01-01T12:00:00");
+                assertThat(ts).isAfterOrEqualTo(filterTime);
+            }
+        }
+    }
+
+    @Test
+    public void testLimitPushdownBasic() throws Exception {
+        // Test basic limit pushdown
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+        tEnv.executeSql("USE CATALOG testCatalog");
+        tEnv.executeSql(
+                "CREATE TABLE T ("
+                        + "id INT, "
+                        + "name STRING, "
+                        + "PRIMARY KEY (id) NOT ENFORCED"
+                        + ")");
+
+        tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 
'c')").await();
+        tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6, 
'f')").await();
+        tEnv.executeSql("INSERT INTO T VALUES (7, 'g'), (8, 'h'), (9, 
'i')").await();
+
+        try (CloseableIterator<Row> iter = tEnv.executeSql("SELECT * FROM T 
LIMIT 5").collect()) {
+            List<Row> rows = new ArrayList<>();
+            iter.forEachRemaining(rows::add);
+
+            assertThat(rows.size()).isEqualTo(5);
+        }
+    }
+
+    @Test
+    public void testLimitPushdownWithDeletionVector() throws Exception {
+        // Test limit pushdown is disabled when deletion vector is enabled
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+        tEnv.executeSql("USE CATALOG testCatalog");
+        tEnv.executeSql(
+                "CREATE TABLE T ("
+                        + "id INT, "
+                        + "name STRING, "
+                        + "PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ("
+                        + "'deletion-vectors.enabled' = 'true'"
+                        + ")");
+
+        tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 
'c')").await();
+        tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6, 
'f')").await();
+
+        tEnv.executeSql("DELETE FROM T WHERE id = 2").await();
+
+        // Limit pushdown should be disabled when deletion vector is enabled
+        // because we can't accurately calculate row count after applying 
deletion vectors
+        try (CloseableIterator<Row> iter = tEnv.executeSql("SELECT * FROM T 
LIMIT 3").collect()) {
+            List<Row> rows = new ArrayList<>();
+            iter.forEachRemaining(rows::add);
+
+            assertThat(rows.size()).isEqualTo(3);
+
+            for (Row row : rows) {
+                assertThat(row.getField(0)).isNotEqualTo(2);
+            }
+        }
+    }
 }


Reply via email to