This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7f34bd3c8a [Core] support limit pushdown with pk table (#6914)
7f34bd3c8a is described below
commit 7f34bd3c8aa9f41b8286a496101311ce250a53f0
Author: wangwj <[email protected]>
AuthorDate: Fri Jan 16 14:11:22 2026 +0800
[Core] support limit pushdown with pk table (#6914)
---
.../paimon/operation/AbstractFileStoreScan.java | 12 +-
.../paimon/operation/AppendOnlyFileStoreScan.java | 18 +-
.../paimon/operation/KeyValueFileStoreScan.java | 187 ++++++++++++--
.../paimon/table/source/DataTableBatchScan.java | 11 +
.../table/source/snapshot/SnapshotReaderImpl.java | 4 +-
.../operation/KeyValueFileStoreScanTest.java | 286 ++++++++++++++++++++-
.../flink/PrimaryKeyFileStoreTableITCase.java | 125 +++++++++
7 files changed, 611 insertions(+), 32 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 284a2ef195..fdefcd0d35 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -46,6 +46,9 @@ import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.SnapshotManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -68,6 +71,8 @@ import static
org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
/** Default implementation of {@link FileStoreScan}. */
public abstract class AbstractFileStoreScan implements FileStoreScan {
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractFileStoreScan.class);
+
private final ManifestsReader manifestsReader;
private final SnapshotManager snapshotManager;
private final ManifestFile.Factory manifestFileFactory;
@@ -277,9 +282,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
Iterator<ManifestEntry> iterator = readManifestEntries(manifests,
false);
- if (supportsLimitPushManifestEntries()) {
- iterator = limitPushManifestEntries(iterator);
- }
List<ManifestEntry> files = ListUtils.toList(iterator);
if (postFilterManifestEntriesEnabled()) {
@@ -289,6 +291,10 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
List<ManifestEntry> result = files;
long scanDuration = (System.nanoTime() - started) / 1_000_000;
+ LOG.info(
+ "File store scan plan completed in {} ms. Files size : {}",
+ scanDuration,
+ result.size());
if (scanMetrics != null) {
long allDataFiles =
manifestsResult.allManifests.stream()
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 1892706169..bf7504a5fb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -33,7 +33,9 @@ import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -92,9 +94,21 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
}
@Override
- protected Iterator<ManifestEntry>
limitPushManifestEntries(Iterator<ManifestEntry> entries) {
+ protected boolean postFilterManifestEntriesEnabled() {
+ return supportsLimitPushManifestEntries();
+ }
+
+ @Override
+ protected List<ManifestEntry>
postFilterManifestEntries(List<ManifestEntry> entries) {
checkArgument(limit != null && limit > 0 && !deletionVectorsEnabled);
- return new LimitAwareManifestEntryIterator(entries, limit);
+ // Use LimitAwareManifestEntryIterator for limit pushdown
+ Iterator<ManifestEntry> iterator =
+ new LimitAwareManifestEntryIterator(entries.iterator(), limit);
+ List<ManifestEntry> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ return result;
}
/** Note: Keep this thread-safe. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index ef6fd1e52b..2ac05b7385 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FilteredManifestEntry;
@@ -37,16 +38,19 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
@@ -55,6 +59,8 @@ import static
org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
/** {@link FileStoreScan} for {@link KeyValueFileStore}. */
public class KeyValueFileStoreScan extends AbstractFileStoreScan {
+ private static final Logger LOG =
LoggerFactory.getLogger(KeyValueFileStoreScan.class);
+
private final SimpleStatsEvolutions fieldKeyStatsConverters;
private final SimpleStatsEvolutions fieldValueStatsConverters;
private final BucketSelectConverter bucketSelectConverter;
@@ -203,29 +209,162 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
}
}
+ /**
+ * Check if limit pushdown is supported for PK tables.
+ *
+ * <p>Not supported when merge engine is PARTIAL_UPDATE/AGGREGATE (need
merge) or deletion
+ * vectors are enabled (can't count deleted rows). For
DEDUPLICATE/FIRST_ROW, per-bucket checks
+ * (no overlapping, no delete rows) are done in
applyLimitPushdownForBucket.
+ */
+ @Override
+ public boolean supportsLimitPushManifestEntries() {
+ if (mergeEngine == PARTIAL_UPDATE || mergeEngine == AGGREGATE) {
+ return false;
+ }
+
+ return limit != null && limit > 0 && !deletionVectorsEnabled;
+ }
+
+ /**
+ * Apply limit pushdown for a single bucket. Returns files to include, or
null if unsafe.
+ *
+ * <p>Returns null if files overlap (LSM level 0 or different levels) or
have delete rows. For
+ * non-overlapping files with no delete rows, accumulates row counts until
limit is reached.
+ *
+ * @param bucketEntries files in the same bucket
+ * @param limit the limit to apply
+ * @return files to include, or null if we can't safely push down limit
+ */
+ @Nullable
+ private List<ManifestEntry> applyLimitPushdownForBucket(
+ List<ManifestEntry> bucketEntries, long limit) {
+ // Check if this bucket has overlapping files (LSM property)
+ boolean hasOverlapping = !noOverlapping(bucketEntries);
+
+ if (hasOverlapping) {
+ // For buckets with overlapping, we can't safely push down limit
because files
+ // need to be merged and we can't accurately calculate the merged
row count.
+ return null;
+ }
+
+ // For buckets without overlapping and with merge engines that don't
require
+ // merge (DEDUPLICATE or FIRST_ROW), we can safely accumulate row count
+ // and stop when limit is reached, but only if files have no delete
rows.
+ List<ManifestEntry> result = new ArrayList<>();
+ long accumulatedRowCount = 0;
+
+ for (ManifestEntry entry : bucketEntries) {
+ long fileRowCount = entry.file().rowCount();
+ // Check if file has delete rows - if so, we can't accurately
calculate
+ // the merged row count, so we need to stop limit pushdown
+ boolean hasDeleteRows =
+ entry.file().deleteRowCount().map(count -> count >
0L).orElse(false);
+
+ if (hasDeleteRows) {
+ // If file has delete rows, we can't accurately calculate
merged row count
+ // without reading the actual data. Can't safely push down
limit.
+ return null;
+ }
+
+ // File has no delete rows, no overlapping, and merge engine
doesn't require merge.
+ // Safe to count rows.
+ result.add(entry);
+ accumulatedRowCount += fileRowCount;
+ if (accumulatedRowCount >= limit) {
+ break;
+ }
+ }
+
+ return result;
+ }
+
@Override
protected boolean postFilterManifestEntriesEnabled() {
- return valueFilter != null && scanMode == ScanMode.ALL;
+ return (valueFilter != null && scanMode == ScanMode.ALL)
+ || supportsLimitPushManifestEntries();
}
@Override
protected List<ManifestEntry>
postFilterManifestEntries(List<ManifestEntry> files) {
- // We group files by bucket here, and filter them by the whole bucket
filter.
- // Why do this: because in primary key table, we can't just filter the
value
- // by the stat in files (see
`PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
- // but we can do this by filter the whole bucket files
- return files.stream()
- .collect(
- Collectors.groupingBy(
- // we use LinkedHashMap to avoid disorder
- file -> Pair.of(file.partition(),
file.bucket()),
- LinkedHashMap::new,
- Collectors.toList()))
- .values()
- .stream()
- .map(this::doFilterWholeBucketByStats)
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ long startTime = System.nanoTime();
+ Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> buckets =
groupByBucket(files);
+
+ // Apply filter if valueFilter is enabled, otherwise use identity
function
+ Function<List<ManifestEntry>, List<ManifestEntry>> bucketProcessor =
+ (valueFilter != null && scanMode == ScanMode.ALL)
+ ? this::doFilterWholeBucketByStats
+ : Function.identity();
+
+ // Apply filter (if enabled) and limit pushdown (if enabled)
+ boolean limitEnabled = supportsLimitPushManifestEntries();
+ List<ManifestEntry> result =
+ applyLimitPushdownToBuckets(buckets, bucketProcessor,
limitEnabled);
+
+ if (limitEnabled) {
+ long duration = (System.nanoTime() - startTime) / 1_000_000;
+ LOG.info(
+ "Limit pushdown for PK table completed in {} ms. Limit:
{}, InputFiles: {}, OutputFiles: {}, "
+ + "MergeEngine: {}, ScanMode: {},
DeletionVectorsEnabled: {}",
+ duration,
+ limit,
+ files.size(),
+ result.size(),
+ mergeEngine,
+ scanMode,
+ deletionVectorsEnabled);
+ }
+
+ return result;
+ }
+
+ /**
+ * Apply limit pushdown to buckets with an optional bucket processor
(e.g., filtering).
+ *
+ * <p>This method processes buckets in order, applying the bucket
processor first, then applying
+ * limit pushdown if enabled. It stops early when the limit is reached.
+ *
+ * @param buckets buckets grouped by (partition, bucket)
+ * @param bucketProcessor processor to apply to each bucket before limit
pushdown
+ * @return processed entries (filtered and limited if limit is enabled)
+ */
+ private List<ManifestEntry> applyLimitPushdownToBuckets(
+ Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> buckets,
+ Function<List<ManifestEntry>, List<ManifestEntry>> bucketProcessor,
+ boolean limitEnabled) {
+ List<ManifestEntry> result = new ArrayList<>();
+ long accumulatedRowCount = 0;
+
+ for (List<ManifestEntry> bucketEntries : buckets.values()) {
+ // Apply bucket processor (e.g., filtering)
+ List<ManifestEntry> processed =
bucketProcessor.apply(bucketEntries);
+
+ if (limitEnabled) {
+ // Apply limit pushdown if enabled
+ if (accumulatedRowCount >= limit) {
+ // Already reached limit, stop processing remaining buckets
+ break;
+ }
+
+ long remainingLimit = limit - accumulatedRowCount;
+ List<ManifestEntry> processedBucket =
+ applyLimitPushdownForBucket(processed, remainingLimit);
+ if (processedBucket == null) {
+ // Can't safely push down limit for this bucket, include
all processed entries
+ result.addAll(processed);
+ } else {
+ result.addAll(processedBucket);
+ for (ManifestEntry entry : processedBucket) {
+ long fileRowCount = entry.file().rowCount();
+ accumulatedRowCount += fileRowCount;
+ }
+ }
+ } else {
+ // No limit pushdown, just add processed entries
+ result.addAll(processed);
+ }
+ }
+
+ return result;
}
private List<ManifestEntry> doFilterWholeBucketByStats(List<ManifestEntry>
entries) {
@@ -316,4 +455,16 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
return true;
}
+
+ /** Group manifest entries by (partition, bucket) while preserving order.
*/
+ private Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> groupByBucket(
+ List<ManifestEntry> entries) {
+ return entries.stream()
+ .collect(
+ Collectors.groupingBy(
+ // we use LinkedHashMap to avoid disorder
+ file -> Pair.of(file.partition(),
file.bucket()),
+ LinkedHashMap::new,
+ Collectors.toList()));
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 67c8acbf62..012160f5bd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -31,6 +31,9 @@ import
org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
import org.apache.paimon.types.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -40,6 +43,8 @@ import static
org.apache.paimon.table.source.PushDownUtils.minmaxAvailable;
/** {@link TableScan} implementation for batch planning. */
public class DataTableBatchScan extends AbstractDataTableScan {
+ private static final Logger LOG =
LoggerFactory.getLogger(DataTableBatchScan.class);
+
private StartingScanner startingScanner;
private boolean hasNext;
@@ -132,6 +137,7 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
long scannedRowCount = 0;
SnapshotReader.Plan plan = ((ScannedResult) result).plan();
List<DataSplit> splits = plan.dataSplits();
+ LOG.info("Applying limit pushdown. Original splits count: {}",
splits.size());
if (splits.isEmpty()) {
return Optional.of(result);
}
@@ -145,6 +151,11 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
if (scannedRowCount >= pushDownLimit) {
SnapshotReader.Plan newPlan =
new PlanImpl(plan.watermark(), plan.snapshotId(),
limitedSplits);
+ LOG.info(
+ "Limit pushdown applied successfully. Original
splits: {}, Limited splits: {}, Pushdown limit: {}",
+ splits.size(),
+ limitedSplits.size(),
+ pushDownLimit);
return Optional.of(new ScannedResult(newPlan));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index e036710e8e..3021eeda3e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -409,9 +409,11 @@ public class SnapshotReaderImpl implements SnapshotReader {
isStreaming
? splitGenerator.splitForStreaming(bucketFiles)
: splitGenerator.splitForBatch(bucketFiles);
+
+ // Calculate bucketPath once per bucket to avoid repeated
computation
+ String bucketPath = pathFactory.bucketPath(partition,
bucket).toString();
for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
List<DataFileMeta> dataFiles = splitGroup.files;
- String bucketPath = pathFactory.bucketPath(partition,
bucket).toString();
builder.withDataFiles(dataFiles)
.rawConvertible(splitGroup.rawConvertible)
.withBucketPath(bucketPath);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
index 4f3d5c1c24..f975cf570d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.Snapshot;
import org.apache.paimon.TestFileStore;
@@ -30,9 +31,10 @@ import
org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.SnapshotManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -59,7 +61,6 @@ public class KeyValueFileStoreScanTest {
private TestKeyValueGenerator gen;
@TempDir java.nio.file.Path tempDir;
private TestFileStore store;
- private SnapshotManager snapshotManager;
@BeforeEach
public void beforeEach() throws Exception {
@@ -76,7 +77,6 @@ public class KeyValueFileStoreScanTest {
DeduplicateMergeFunction.factory(),
null)
.build();
- snapshotManager = store.snapshotManager();
SchemaManager schemaManager =
new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toUri()));
@@ -271,6 +271,276 @@ public class KeyValueFileStoreScanTest {
}
}
+ @Test
+ public void testLimitPushdownWithoutValueFilter() throws Exception {
+ // Write multiple files to test limit pushdown
+ List<KeyValue> data1 = generateData(50);
+ writeData(data1);
+ List<KeyValue> data2 = generateData(50);
+ writeData(data2);
+ List<KeyValue> data3 = generateData(50);
+ Snapshot snapshot = writeData(data3);
+
+ // Without limit, should read all files
+ KeyValueFileStoreScan scanWithoutLimit = store.newScan();
+ scanWithoutLimit.withSnapshot(snapshot.id());
+ List<ManifestEntry> filesWithoutLimit =
scanWithoutLimit.plan().files();
+ int totalFiles = filesWithoutLimit.size();
+ assertThat(totalFiles).isGreaterThan(0);
+
+ // With limit, should read fewer files (limit pushdown should work)
+ KeyValueFileStoreScan scanWithLimit = store.newScan();
+ scanWithLimit.withSnapshot(snapshot.id()).withLimit(10);
+ List<ManifestEntry> filesWithLimit = scanWithLimit.plan().files();
+ // Limit pushdown should reduce the number of files read
+ assertThat(filesWithLimit.size()).isLessThanOrEqualTo(totalFiles);
+ assertThat(filesWithLimit.size()).isGreaterThan(0);
+ }
+
+ @Test
+ public void testLimitPushdownWithValueFilter() throws Exception {
+ // Write data with different item values
+ List<KeyValue> data1 = generateData(50, 0, 100L);
+ writeData(data1);
+ List<KeyValue> data2 = generateData(50, 0, 200L);
+ writeData(data2);
+ List<KeyValue> data3 = generateData(50, 0, 300L);
+ Snapshot snapshot = writeData(data3);
+
+ // Without valueFilter, limit pushdown should work
+ KeyValueFileStoreScan scanWithoutFilter = store.newScan();
+ scanWithoutFilter.withSnapshot(snapshot.id()).withLimit(10);
+ List<ManifestEntry> filesWithoutFilter =
scanWithoutFilter.plan().files();
+ int totalFilesWithoutFilter = filesWithoutFilter.size();
+ assertThat(totalFilesWithoutFilter).isGreaterThan(0);
+
+ // With valueFilter, limit pushdown should still work.
+ KeyValueFileStoreScan scanWithFilter = store.newScan();
+ scanWithFilter.withSnapshot(snapshot.id());
+ scanWithFilter.withValueFilter(
+ new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE)
+ .between(4, 100L, 200L));
+ scanWithFilter.withLimit(10);
+ List<ManifestEntry> filesWithFilter = scanWithFilter.plan().files();
+
+ // Limit pushdown should work with valueFilter
+ // The number of files should be less than or equal to the total files
after filtering
+ assertThat(filesWithFilter.size()).isGreaterThan(0);
+
assertThat(filesWithFilter.size()).isLessThanOrEqualTo(totalFilesWithoutFilter);
+ }
+
+ @Test
+ public void testLimitPushdownWithKeyFilter() throws Exception {
+ // Write data with different shop IDs
+ List<KeyValue> data = generateData(200);
+ Snapshot snapshot = writeData(data);
+
+ // With keyFilter, limit pushdown should still work (keyFilter doesn't
affect limit
+ // pushdown)
+ KeyValueFileStoreScan scan = store.newScan();
+ scan.withSnapshot(snapshot.id());
+ scan.withKeyFilter(
+ new PredicateBuilder(RowType.of(new IntType(false)))
+ .equal(0, data.get(0).key().getInt(0)));
+ scan.withLimit(5);
+ List<ManifestEntry> files = scan.plan().files();
+ assertThat(files.size()).isGreaterThan(0);
+ }
+
+ @Test
+ public void testLimitPushdownMultipleBuckets() throws Exception {
+ // Write data to multiple buckets to test limit pushdown across buckets
+ List<KeyValue> data1 = generateData(30);
+ writeData(data1);
+ List<KeyValue> data2 = generateData(30);
+ writeData(data2);
+ List<KeyValue> data3 = generateData(30);
+ Snapshot snapshot = writeData(data3);
+
+ // Without limit, should read all files
+ KeyValueFileStoreScan scanWithoutLimit = store.newScan();
+ scanWithoutLimit.withSnapshot(snapshot.id());
+ List<ManifestEntry> filesWithoutLimit =
scanWithoutLimit.plan().files();
+ int totalFiles = filesWithoutLimit.size();
+ assertThat(totalFiles).isGreaterThan(0);
+
+ // With limit, should read fewer files (limit pushdown should work
across buckets)
+ KeyValueFileStoreScan scanWithLimit = store.newScan();
+ scanWithLimit.withSnapshot(snapshot.id()).withLimit(20);
+ List<ManifestEntry> filesWithLimit = scanWithLimit.plan().files();
+ // Limit pushdown should reduce the number of files read
+ assertThat(filesWithLimit.size()).isLessThanOrEqualTo(totalFiles);
+ assertThat(filesWithLimit.size()).isGreaterThan(0);
+ }
+
+ @Test
+ public void testLimitPushdownWithSmallLimit() throws Exception {
+ // Test limit pushdown with a very small limit
+ List<KeyValue> data1 = generateData(100);
+ writeData(data1);
+ List<KeyValue> data2 = generateData(100);
+ writeData(data2);
+ Snapshot snapshot = writeData(data2);
+
+ KeyValueFileStoreScan scan = store.newScan();
+ scan.withSnapshot(snapshot.id()).withLimit(1);
+ List<ManifestEntry> files = scan.plan().files();
+ // Should read at least one file, but fewer than all files
+ assertThat(files.size()).isGreaterThan(0);
+ }
+
+ @Test
+ public void testLimitPushdownWithLargeLimit() throws Exception {
+ // Test limit pushdown with a large limit (larger than total rows)
+ List<KeyValue> data1 = generateData(50);
+ writeData(data1);
+ List<KeyValue> data2 = generateData(50);
+ Snapshot snapshot = writeData(data2);
+
+ KeyValueFileStoreScan scanWithoutLimit = store.newScan();
+ scanWithoutLimit.withSnapshot(snapshot.id());
+ List<ManifestEntry> filesWithoutLimit =
scanWithoutLimit.plan().files();
+ int totalFiles = filesWithoutLimit.size();
+
+ KeyValueFileStoreScan scanWithLimit = store.newScan();
+ scanWithLimit.withSnapshot(snapshot.id()).withLimit(10000);
+ List<ManifestEntry> filesWithLimit = scanWithLimit.plan().files();
+ // With a large limit, should read all files
+ assertThat(filesWithLimit.size()).isEqualTo(totalFiles);
+ }
+
+ @Test
+ public void testLimitPushdownWithPartialUpdateMergeEngine() throws
Exception {
+ // Test that limit pushdown is disabled for PARTIAL_UPDATE merge engine
+ // Create a store with PARTIAL_UPDATE merge engine by setting it in
schema options
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toUri()));
+ Schema schema =
+ new Schema(
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+
Collections.singletonMap(CoreOptions.MERGE_ENGINE.key(), "partial-update"),
+ null);
+ TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager,
schema);
+
+ TestFileStore storePartialUpdate =
+ new TestFileStore.Builder(
+ "avro",
+ tempDir.toString(),
+ NUM_BUCKETS,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+ DeduplicateMergeFunction.factory(),
+ tableSchema)
+ .build();
+
+ List<KeyValue> data1 = generateData(50);
+ writeData(data1, storePartialUpdate);
+ List<KeyValue> data2 = generateData(50);
+ Snapshot snapshot = writeData(data2, storePartialUpdate);
+
+ KeyValueFileStoreScan scan = storePartialUpdate.newScan();
+ scan.withSnapshot(snapshot.id()).withLimit(10);
+ // supportsLimitPushManifestEntries should return false for
PARTIAL_UPDATE
+ assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+
+ // Should read all files since limit pushdown is disabled
+ KeyValueFileStoreScan scanWithoutLimit = storePartialUpdate.newScan();
+ scanWithoutLimit.withSnapshot(snapshot.id());
+ List<ManifestEntry> filesWithoutLimit =
scanWithoutLimit.plan().files();
+ int totalFiles = filesWithoutLimit.size();
+
+ List<ManifestEntry> filesWithLimit = scan.plan().files();
+ assertThat(filesWithLimit.size()).isEqualTo(totalFiles);
+ }
+
+ @Test
+ public void testLimitPushdownWithAggregateMergeEngine() throws Exception {
+ // Test that limit pushdown is disabled for AGGREGATE merge engine
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toUri()));
+ Schema schema =
+ new Schema(
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+
Collections.singletonMap(CoreOptions.MERGE_ENGINE.key(), "aggregation"),
+ null);
+ TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager,
schema);
+
+ TestFileStore storeAggregate =
+ new TestFileStore.Builder(
+ "avro",
+ tempDir.toString(),
+ NUM_BUCKETS,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+ DeduplicateMergeFunction.factory(),
+ tableSchema)
+ .build();
+
+ List<KeyValue> data1 = generateData(50);
+ writeData(data1, storeAggregate);
+ List<KeyValue> data2 = generateData(50);
+ Snapshot snapshot = writeData(data2, storeAggregate);
+
+ KeyValueFileStoreScan scan = storeAggregate.newScan();
+ scan.withSnapshot(snapshot.id()).withLimit(10);
+ // supportsLimitPushManifestEntries should return false for AGGREGATE
+ assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+
+ // Should read all files since limit pushdown is disabled
+ KeyValueFileStoreScan scanWithoutLimit = storeAggregate.newScan();
+ scanWithoutLimit.withSnapshot(snapshot.id());
+ List<ManifestEntry> filesWithoutLimit =
scanWithoutLimit.plan().files();
+ int totalFiles = filesWithoutLimit.size();
+
+ List<ManifestEntry> filesWithLimit = scan.plan().files();
+ assertThat(filesWithLimit.size()).isEqualTo(totalFiles);
+ }
+
+ @Test
+ public void testLimitPushdownWithDeletionVectors() throws Exception {
+ // Test that limit pushdown is disabled when deletion vectors are
enabled
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toUri()));
+ Schema schema =
+ new Schema(
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+ Collections.singletonMap(
+ CoreOptions.DELETION_VECTORS_ENABLED.key(),
"true"),
+ null);
+ TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager,
schema);
+
+ TestFileStore storeWithDV =
+ new TestFileStore.Builder(
+ "avro",
+ tempDir.toString(),
+ NUM_BUCKETS,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+ DeduplicateMergeFunction.factory(),
+ tableSchema)
+ .build();
+
+ KeyValueFileStoreScan scan = storeWithDV.newScan();
+ scan.withLimit(10);
+ // supportsLimitPushManifestEntries should return false when deletion
vectors are enabled
+ assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+ }
+
private void runTestExactMatch(
FileStoreScan scan, Long expectedSnapshotId, Map<BinaryRow,
BinaryRow> expected)
throws Exception {
@@ -307,10 +577,6 @@ public class KeyValueFileStoreScanTest {
return data;
}
- private List<KeyValue> generateData(int numRecords, int hr) {
- return generateData(numRecords, hr, null);
- }
-
private List<KeyValue> generateData(int numRecords, int hr, Long itemId) {
List<KeyValue> data = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
@@ -320,7 +586,11 @@ public class KeyValueFileStoreScanTest {
}
private Snapshot writeData(List<KeyValue> kvs) throws Exception {
- List<Snapshot> snapshots = store.commitData(kvs, gen::getPartition,
this::getBucket);
+ return writeData(kvs, store);
+ }
+
+ private Snapshot writeData(List<KeyValue> kvs, TestFileStore testStore)
throws Exception {
+ List<Snapshot> snapshots = testStore.commitData(kvs,
gen::getPartition, this::getBucket);
return snapshots.get(snapshots.size() - 1);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index a7fb814f4a..d98f277a55 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -1598,4 +1598,129 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
}
}
}
+
+ @Test
+ public void testLimitPushdownWithTimeFilter() throws Exception {
+ // This test verifies that limit pushdown works correctly when
valueFilter
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ tEnv.executeSql("USE CATALOG testCatalog");
+ tEnv.executeSql(
+ "CREATE TABLE T ("
+ + "id INT, "
+ + "name STRING, "
+ + "ts TIMESTAMP(3), "
+ + "PRIMARY KEY (id) NOT ENFORCED"
+ + ")");
+
+ // Insert data with different timestamps
+ tEnv.executeSql(
+ "INSERT INTO T VALUES "
+ + "(1, 'a', TIMESTAMP '2024-01-01 10:00:00'), "
+ + "(2, 'b', TIMESTAMP '2024-01-01 11:00:00'), "
+ + "(3, 'c', TIMESTAMP '2024-01-01 12:00:00'), "
+ + "(4, 'd', TIMESTAMP '2024-01-01 13:00:00'), "
+ + "(5, 'e', TIMESTAMP '2024-01-01 14:00:00')")
+ .await();
+
+ // Without filter, limit pushdown should work
+ try (CloseableIterator<Row> iter = tEnv.executeSql("SELECT * FROM T
LIMIT 3").collect()) {
+ List<Row> allRows = new ArrayList<>();
+ iter.forEachRemaining(allRows::add);
+ assertThat(allRows.size()).isEqualTo(3);
+ }
+
+ // Test limit pushdown with time filter (4 rows match, LIMIT 3)
+ try (CloseableIterator<Row> iter =
+ tEnv.executeSql(
+ "SELECT * FROM T WHERE ts >= TIMESTAMP
'2024-01-01 11:00:00' LIMIT 3")
+ .collect()) {
+ List<Row> filteredRows = new ArrayList<>();
+ iter.forEachRemaining(filteredRows::add);
+ assertThat(filteredRows.size()).isGreaterThanOrEqualTo(3);
+ assertThat(filteredRows.size()).isLessThanOrEqualTo(4);
+ for (Row row : filteredRows) {
+ java.time.LocalDateTime ts = (java.time.LocalDateTime)
row.getField(2);
+ java.time.LocalDateTime filterTime =
+ java.time.LocalDateTime.parse("2024-01-01T11:00:00");
+ assertThat(ts).isAfterOrEqualTo(filterTime);
+ }
+ }
+
+ // Test with more restrictive filter (3 rows match, LIMIT 2)
+ try (CloseableIterator<Row> iter =
+ tEnv.executeSql(
+ "SELECT * FROM T WHERE ts >= TIMESTAMP
'2024-01-01 12:00:00' LIMIT 2")
+ .collect()) {
+ List<Row> filteredRows2 = new ArrayList<>();
+ iter.forEachRemaining(filteredRows2::add);
+ assertThat(filteredRows2.size()).isGreaterThanOrEqualTo(2);
+ assertThat(filteredRows2.size()).isLessThanOrEqualTo(3);
+ for (Row row : filteredRows2) {
+ java.time.LocalDateTime ts = (java.time.LocalDateTime)
row.getField(2);
+ java.time.LocalDateTime filterTime =
+ java.time.LocalDateTime.parse("2024-01-01T12:00:00");
+ assertThat(ts).isAfterOrEqualTo(filterTime);
+ }
+ }
+ }
+
+ @Test
+ public void testLimitPushdownBasic() throws Exception {
+ // Test basic limit pushdown
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ tEnv.executeSql("USE CATALOG testCatalog");
+ tEnv.executeSql(
+ "CREATE TABLE T ("
+ + "id INT, "
+ + "name STRING, "
+ + "PRIMARY KEY (id) NOT ENFORCED"
+ + ")");
+
+ tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3,
'c')").await();
+ tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6,
'f')").await();
+ tEnv.executeSql("INSERT INTO T VALUES (7, 'g'), (8, 'h'), (9,
'i')").await();
+
+ try (CloseableIterator<Row> iter = tEnv.executeSql("SELECT * FROM T
LIMIT 5").collect()) {
+ List<Row> rows = new ArrayList<>();
+ iter.forEachRemaining(rows::add);
+
+ assertThat(rows.size()).isEqualTo(5);
+ }
+ }
+
+ @Test
+ public void testLimitPushdownWithDeletionVector() throws Exception {
+ // Test limit pushdown is disabled when deletion vector is enabled
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ tEnv.executeSql("USE CATALOG testCatalog");
+ tEnv.executeSql(
+ "CREATE TABLE T ("
+ + "id INT, "
+ + "name STRING, "
+ + "PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ("
+ + "'deletion-vectors.enabled' = 'true'"
+ + ")");
+
+ tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3,
'c')").await();
+ tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6,
'f')").await();
+
+ tEnv.executeSql("DELETE FROM T WHERE id = 2").await();
+
+ // Limit pushdown should be disabled when deletion vector is enabled
+ // because we can't accurately calculate row count after applying
deletion vectors
+ try (CloseableIterator<Row> iter = tEnv.executeSql("SELECT * FROM T
LIMIT 3").collect()) {
+ List<Row> rows = new ArrayList<>();
+ iter.forEachRemaining(rows::add);
+
+ assertThat(rows.size()).isEqualTo(3);
+
+ for (Row row : rows) {
+ assertThat(row.getField(0)).isNotEqualTo(2);
+ }
+ }
+ }
}