This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6646071840 [core] Support TopN pushdown in deletion-vector mode (#6097)
6646071840 is described below

commit 6646071840b53c822a88ef680ad5c0722f6f15b3
Author: Tan-JiaLiang <tanjialiang1...@gmail.com>
AuthorDate: Wed Aug 20 12:27:34 2025 +0800

    [core] Support TopN pushdown in deletion-vector mode (#6097)
---
 .../paimon/fileindex/bitmap/BitmapIndexResult.java |  17 ++++
 .../org/apache/paimon/utils/RoaringBitmap32.java   |   4 +
 .../org/apache/paimon/AppendOnlyFileStore.java     |   3 +-
 .../java/org/apache/paimon/KeyValueFileStore.java  |   3 +-
 .../org/apache/paimon/io/FileIndexEvaluator.java   | 100 +++++++++++++--------
 .../apache/paimon/operation/RawFileSplitRead.java  |  29 ++----
 .../paimon/table/source/KeyValueTableRead.java     |  12 +++
 .../paimon/table/AppendOnlySimpleTableTest.java    |  33 -------
 .../paimon/table/PrimaryKeySimpleTableTest.java    | 100 +++++++++++++++++++++
 .../flink/source/TestChangelogDataReadWrite.java   |   1 -
 .../apache/paimon/spark/PaimonScanBuilder.scala    |  13 +--
 .../paimon/spark/sql/PaimonPushDownTestBase.scala  |  85 ++++++++++++++++++
 12 files changed, 291 insertions(+), 109 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
index a1fc6ca551..96e6fe8284 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
@@ -22,6 +22,7 @@ import org.apache.paimon.fileindex.FileIndexResult;
 import org.apache.paimon.utils.LazyField;
 import org.apache.paimon.utils.RoaringBitmap32;
 
+import java.util.Objects;
 import java.util.function.Supplier;
 
 /** bitmap file index result. */
@@ -53,4 +54,20 @@ public class BitmapIndexResult extends 
LazyField<RoaringBitmap32> implements Fil
         }
         return FileIndexResult.super.or(fileIndexResult);
     }
+
+    public FileIndexResult andNot(RoaringBitmap32 deletion) {
+        return new BitmapIndexResult(() -> RoaringBitmap32.andNot(get(), 
deletion));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        BitmapIndexResult that = (BitmapIndexResult) o;
+        return Objects.equals(this.get(), that.get());
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
index 8c3b2802ac..b37eb837cf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
@@ -175,6 +175,10 @@ public class RoaringBitmap32 {
         return roaringBitmap32;
     }
 
+    public static RoaringBitmap32 bitmapOfRange(long min, long max) {
+        return new RoaringBitmap32(RoaringBitmap.bitmapOfRange(min, max));
+    }
+
     public static RoaringBitmap32 and(final RoaringBitmap32 x1, final 
RoaringBitmap32 x2) {
         return new RoaringBitmap32(RoaringBitmap.and(x1.roaringBitmap, 
x2.roaringBitmap));
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index d7fa9eb66b..0d9ad3b121 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -83,8 +83,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 FileFormatDiscover.of(options),
                 pathFactory(),
                 options.fileIndexReadEnabled(),
-                options.rowTrackingEnabled(),
-                options.deletionVectorsEnabled());
+                options.rowTrackingEnabled());
     }
 
     public DataEvolutionSplitRead newDataEvolutionRead() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 48d0bf872b..f7a743ace1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -132,8 +132,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 FileFormatDiscover.of(options),
                 pathFactory(),
                 options.fileIndexReadEnabled(),
-                false,
-                options.deletionVectorsEnabled());
+                false);
     }
 
     public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
index c71af3f2bc..bf04b453af 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
@@ -18,19 +18,24 @@
 
 package org.apache.paimon.io;
 
+import org.apache.paimon.deletionvectors.BitmapDeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.fileindex.FileIndexPredicate;
 import org.apache.paimon.fileindex.FileIndexResult;
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.utils.ListUtils;
+import org.apache.paimon.utils.RoaringBitmap32;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 /** Evaluate file index result. */
@@ -42,55 +47,76 @@ public class FileIndexEvaluator {
             List<Predicate> dataFilter,
             @Nullable TopN topN,
             DataFilePathFactory dataFilePathFactory,
-            DataFileMeta file)
+            DataFileMeta file,
+            @Nullable DeletionVector deletionVector)
             throws IOException {
-        FileIndexResult result = FileIndexResult.REMAIN;
         if (ListUtils.isNullOrEmpty(dataFilter) && topN == null) {
-            return result;
+            return FileIndexResult.REMAIN;
+        }
+
+        FileIndexResult selection =
+                new BitmapIndexResult(() -> RoaringBitmap32.bitmapOfRange(0, 
file.rowCount()));
+        if (deletionVector instanceof BitmapDeletionVector) {
+            RoaringBitmap32 deletion = ((BitmapDeletionVector) 
deletionVector).get();
+            selection = ((BitmapIndexResult) selection).andNot(deletion);
         }
 
-        FileIndexPredicate predicate = null;
-        try {
-            byte[] embeddedIndex = file.embeddedIndex();
-            if (embeddedIndex != null) {
-                predicate = new FileIndexPredicate(embeddedIndex, 
dataSchema.logicalRowType());
-            } else {
-                List<String> indexFiles =
-                        file.extraFiles().stream()
-                                .filter(
-                                        name ->
-                                                name.endsWith(
-                                                        
DataFilePathFactory.INDEX_PATH_SUFFIX))
-                                .collect(Collectors.toList());
-                if (indexFiles.isEmpty()) {
-                    return result;
+        try (FileIndexPredicate predicate =
+                createFileIndexPredicate(fileIO, dataSchema, 
dataFilePathFactory, file)) {
+            FileIndexResult result = FileIndexResult.REMAIN;
+            if (predicate != null) {
+                if (!ListUtils.isNullOrEmpty(dataFilter)) {
+                    Predicate filter = 
PredicateBuilder.and(dataFilter.toArray(new Predicate[0]));
+                    result = predicate.evaluate(filter);
+                    result.and(selection);
+                } else if (topN != null) {
+                    result = predicate.evaluateTopN(topN, selection);
                 }
-                if (indexFiles.size() > 1) {
-                    throw new RuntimeException(
-                            "Found more than one index file for one data file: 
"
-                                    + String.join(" and ", indexFiles));
+
+                // if all position selected, or if only and not the deletion
+                // the effect will not obvious, just return REMAIN.
+                if (Objects.equals(result, selection)) {
+                    result = FileIndexResult.REMAIN;
                 }
-                predicate =
-                        new FileIndexPredicate(
-                                
dataFilePathFactory.toAlignedPath(indexFiles.get(0), file),
-                                fileIO,
-                                dataSchema.logicalRowType());
             }
 
-            // evaluate
-            if (!ListUtils.isNullOrEmpty(dataFilter)) {
-                result =
-                        predicate.evaluate(
-                                PredicateBuilder.and(dataFilter.toArray(new 
Predicate[0])));
-            } else if (topN != null) {
-                result = predicate.evaluateTopN(topN, result);
+            if (!result.remain()) {
+                result = FileIndexResult.SKIP;
             }
 
             return result;
-        } finally {
-            if (predicate != null) {
-                predicate.close();
+        }
+    }
+
+    private static FileIndexPredicate createFileIndexPredicate(
+            FileIO fileIO,
+            TableSchema dataSchema,
+            DataFilePathFactory dataFilePathFactory,
+            DataFileMeta file)
+            throws IOException {
+        FileIndexPredicate predicate;
+        byte[] embeddedIndex = file.embeddedIndex();
+        if (embeddedIndex != null) {
+            predicate = new FileIndexPredicate(embeddedIndex, 
dataSchema.logicalRowType());
+        } else {
+            List<String> indexFiles =
+                    file.extraFiles().stream()
+                            .filter(name -> 
name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+                            .collect(Collectors.toList());
+            if (indexFiles.isEmpty()) {
+                return null;
+            }
+            if (indexFiles.size() > 1) {
+                throw new RuntimeException(
+                        "Found more than one index file for one data file: "
+                                + String.join(" and ", indexFiles));
             }
+            predicate =
+                    new FileIndexPredicate(
+                            
dataFilePathFactory.toAlignedPath(indexFiles.get(0), file),
+                            fileIO,
+                            dataSchema.logicalRowType());
         }
+        return predicate;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 27211a26bd..1c1249978a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -21,7 +21,6 @@ package org.apache.paimon.operation;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
-import org.apache.paimon.deletionvectors.BitmapDeletionVector;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fileindex.FileIndexResult;
@@ -77,7 +76,6 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
     private final TableSchema schema;
     private final FileFormatDiscover formatDiscover;
     private final FileStorePathFactory pathFactory;
-    private final boolean deletionVectorsEnabled;
     private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
     private final boolean fileIndexReadEnabled;
     private final boolean rowTrackingEnabled;
@@ -94,14 +92,12 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
             FileFormatDiscover formatDiscover,
             FileStorePathFactory pathFactory,
             boolean fileIndexReadEnabled,
-            boolean rowTrackingEnabled,
-            boolean deletionVectorsEnabled) {
+            boolean rowTrackingEnabled) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.schema = schema;
         this.formatDiscover = formatDiscover;
         this.pathFactory = pathFactory;
-        this.deletionVectorsEnabled = deletionVectorsEnabled;
         this.formatReaderMappings = new HashMap<>();
         this.fileIndexReadEnabled = fileIndexReadEnabled;
         this.rowTrackingEnabled = rowTrackingEnabled;
@@ -134,9 +130,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
 
     @Override
     public SplitRead<InternalRow> withTopN(@Nullable TopN topN) {
-        if (!deletionVectorsEnabled) {
-            this.topN = topN;
-        }
+        this.topN = topN;
         return this;
     }
 
@@ -228,6 +222,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
             IOExceptionSupplier<DeletionVector> dvFactory)
             throws IOException {
         FileIndexResult fileIndexResult = null;
+        DeletionVector deletionVector = dvFactory == null ? null : 
dvFactory.get();
         if (fileIndexReadEnabled) {
             fileIndexResult =
                     FileIndexEvaluator.evaluate(
@@ -236,7 +231,8 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
                             formatReaderMapping.getDataFilters(),
                             formatReaderMapping.getTopN(),
                             dataFilePathFactory,
-                            file);
+                            file,
+                            deletionVector);
             if (!fileIndexResult.remain()) {
                 return new EmptyFileRecordReader<>();
             }
@@ -247,21 +243,6 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
             selection = ((BitmapIndexResult) fileIndexResult).get();
         }
 
-        RoaringBitmap32 deletion = null;
-        DeletionVector deletionVector = dvFactory == null ? null : 
dvFactory.get();
-        if (deletionVector instanceof BitmapDeletionVector) {
-            deletion = ((BitmapDeletionVector) deletionVector).get();
-        }
-
-        if (selection != null) {
-            if (deletion != null) {
-                selection = RoaringBitmap32.andNot(selection, deletion);
-            }
-            if (selection.isEmpty()) {
-                return new EmptyFileRecordReader<>();
-            }
-        }
-
         FormatReaderContext formatReaderContext =
                 new FormatReaderContext(
                         fileIO, dataFilePathFactory.toPath(file), 
file.fileSize(), selection);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index c4bede61d8..5df41399cb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -26,6 +26,7 @@ import org.apache.paimon.operation.MergeFileSplitRead;
 import org.apache.paimon.operation.RawFileSplitRead;
 import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
 import 
org.apache.paimon.table.source.splitread.IncrementalChangelogReadProvider;
@@ -54,6 +55,7 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead {
     private boolean forceKeepDelete = false;
     private Predicate predicate = null;
     private IOManager ioManager = null;
+    @Nullable private TopN topN = null;
 
     public KeyValueTableRead(
             Supplier<MergeFileSplitRead> mergeReadSupplier,
@@ -86,6 +88,9 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead {
         if (readType != null) {
             read = read.withReadType(readType);
         }
+        if (topN != null) {
+            read = read.withTopN(topN);
+        }
         read.withFilter(predicate).withIOManager(ioManager);
     }
 
@@ -109,6 +114,13 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead {
         return this;
     }
 
+    @Override
+    public InnerTableRead withTopN(TopN topN) {
+        initialized().forEach(r -> r.withTopN(topN));
+        this.topN = topN;
+        return this;
+    }
+
     @Override
     public TableRead withIOManager(IOManager ioManager) {
         initialized().forEach(r -> r.withIOManager(ioManager));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index c262f892a8..024e6320b8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -92,7 +92,6 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.BUCKET_KEY;
 import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY;
-import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET;
 import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
@@ -945,38 +944,6 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
             assertThat(cnt.get()).isEqualTo(rowCount);
             reader.close();
         }
-
-        // test should not push topN with dv modes
-        {
-            table.schemaManager()
-                    .commitChanges(SchemaChange.updateColumnType("price", 
DataTypes.INT()));
-            rowType =
-                    RowType.builder()
-                            .field("id", DataTypes.STRING())
-                            .field("event", DataTypes.STRING())
-                            .field("price", DataTypes.INT())
-                            .build();
-            Consumer<Options> newConfigure =
-                    options -> {
-                        configure.accept(options);
-                        options.set(DELETION_VECTORS_ENABLED, true);
-                    };
-            table = createUnawareBucketFileStoreTable(rowType, newConfigure);
-            DataField field = rowType.getField("price");
-            SortValue sort =
-                    new SortValue(
-                            new FieldRef(field.id(), field.name(), 
field.type()),
-                            SortValue.SortDirection.DESCENDING,
-                            SortValue.NullOrdering.NULLS_LAST);
-            TopN topN = new TopN(Collections.singletonList(sort), k);
-            TableScan.Plan plan = table.newScan().plan();
-            RecordReader<InternalRow> reader =
-                    table.newRead().withTopN(topN).createReader(plan.splits());
-            AtomicInteger cnt = new AtomicInteger(0);
-            reader.forEachRemaining(row -> cnt.incrementAndGet());
-            assertThat(cnt.get()).isEqualTo(rowCount);
-            reader.close();
-        }
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index c02d617dbe..1e4b8b185b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -41,8 +41,11 @@ import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
 import org.apache.paimon.postpone.PostponeBucketWriter;
+import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.SortValue;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
@@ -72,12 +75,14 @@ import 
org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.system.AuditLogTable;
 import org.apache.paimon.table.system.FileMonitorTable;
 import org.apache.paimon.table.system.ReadOptimizedTable;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.RoaringBitmap32;
 
 import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat;
 
@@ -1203,6 +1208,101 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
         }
     }
 
+    @Test
+    public void testTopNPushDownInDeletionVectorMode() throws Exception {
+        String indexColumnName = "b";
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(BUCKET, 1);
+                            conf.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
+                            conf.set(DELETION_VECTORS_ENABLED, true);
+                            conf.set(ParquetOutputFormat.BLOCK_SIZE, "524288");
+                            
conf.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
+                            conf.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, 
"300");
+                            conf.set("file-index.range-bitmap.columns", 
indexColumnName);
+                        });
+
+        int rowCount = 1000000;
+        StreamTableWrite write =
+                table.newWrite(commitUser).withIOManager(new 
IOManagerImpl(tempDir.toString()));
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        // append
+        for (int i = 0; i < rowCount; i++) {
+            write.write(rowDataWithKind(RowKind.INSERT, 1, i, (long) i));
+        }
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        // delete [0, 20000]
+        int min = 20000;
+        for (int i = 0; i < 20000; i++) {
+            write.write(rowDataWithKind(RowKind.DELETE, 1, i, (long) i));
+        }
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        // delete (rowCount - 20000, rowCount)
+        int max = rowCount - 20000;
+        for (int i = rowCount - 20000; i < rowCount; i++) {
+            write.write(rowDataWithKind(RowKind.DELETE, 1, i, (long) i));
+        }
+        commit.commit(2, write.prepareCommit(true, 2));
+        write.close();
+        commit.close();
+
+        // test bottom k
+        {
+            int k = new Random().nextInt(100);
+            RoaringBitmap32 bitmap = RoaringBitmap32.bitmapOfRange(min, min + 
k);
+            DataField field = 
table.schema().nameToFieldMap().get(indexColumnName);
+            SortValue sort =
+                    new SortValue(
+                            new FieldRef(field.id(), field.name(), 
field.type()),
+                            SortValue.SortDirection.ASCENDING,
+                            SortValue.NullOrdering.NULLS_LAST);
+            TopN topN = new TopN(Collections.singletonList(sort), k);
+            TableScan.Plan plan = table.newScan().plan();
+            RecordReader<InternalRow> reader =
+                    table.newRead().withTopN(topN).createReader(plan.splits());
+            AtomicInteger cnt = new AtomicInteger(0);
+            RoaringBitmap32 actual = new RoaringBitmap32();
+            reader.forEachRemaining(
+                    row -> {
+                        cnt.incrementAndGet();
+                        actual.add((int) row.getLong(2));
+                    });
+            assertThat(cnt.get()).isEqualTo(k);
+            assertThat(actual).isEqualTo(bitmap);
+            reader.close();
+        }
+
+        // test top k
+        {
+            int k = new Random().nextInt(100);
+            RoaringBitmap32 bitmap = RoaringBitmap32.bitmapOfRange(max - k, 
max);
+            DataField field = 
table.schema().nameToFieldMap().get(indexColumnName);
+            SortValue sort =
+                    new SortValue(
+                            new FieldRef(field.id(), field.name(), 
field.type()),
+                            SortValue.SortDirection.DESCENDING,
+                            SortValue.NullOrdering.NULLS_LAST);
+            TopN topN = new TopN(Collections.singletonList(sort), k);
+            TableScan.Plan plan = table.newScan().plan();
+            RecordReader<InternalRow> reader =
+                    table.newRead().withTopN(topN).createReader(plan.splits());
+            AtomicInteger cnt = new AtomicInteger(0);
+            RoaringBitmap32 actual = new RoaringBitmap32();
+            reader.forEachRemaining(
+                    row -> {
+                        cnt.incrementAndGet();
+                        actual.add((int) row.getLong(2));
+                    });
+            assertThat(cnt.get()).isEqualTo(k);
+            assertThat(actual).isEqualTo(bitmap);
+            reader.close();
+        }
+    }
+
     @Test
     public void testWithShardFirstRow() throws Exception {
         FileStoreTable table =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index cb6ef1c40b..9847276fe8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -147,7 +147,6 @@ public class TestChangelogDataReadWrite {
                         FileFormatDiscover.of(options),
                         pathFactory,
                         options.fileIndexReadEnabled(),
-                        false,
                         false);
         return new KeyValueTableRead(() -> read, () -> rawFileRead, null);
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 729613f596..aa50f76043 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.CoreOptions
 import org.apache.paimon.predicate._
 import org.apache.paimon.predicate.SortValue.{NullOrdering, SortDirection}
 import org.apache.paimon.spark.aggregate.{AggregatePushDownUtils, 
LocalAggregator}
@@ -102,12 +101,7 @@ class PaimonScanBuilder(table: InnerTable)
       return false
     }
 
-    if (!table.isInstanceOf[AppendOnlyFileStoreTable]) {
-      return false
-    }
-
-    val coreOptions = CoreOptions.fromMap(table.options())
-    if (coreOptions.deletionVectorsEnabled()) {
+    if (!table.isInstanceOf[FileStoreTable]) {
       return false
     }
 
@@ -115,7 +109,6 @@ class PaimonScanBuilder(table: InnerTable)
       return false
     }
 
-    val order = orders(0)
     val fieldName = orders.head.expression() match {
       case nr: NamedReference => nr.fieldNames.mkString(".")
       case _ => return false
@@ -129,13 +122,13 @@ class PaimonScanBuilder(table: InnerTable)
     val field = rowType.getField(fieldName)
     val ref = new FieldRef(field.id(), field.name(), field.`type`())
 
-    val nullOrdering = order.nullOrdering() match {
+    val nullOrdering = orders.head.nullOrdering() match {
       case expressions.NullOrdering.NULLS_LAST => NullOrdering.NULLS_LAST
       case expressions.NullOrdering.NULLS_FIRST => NullOrdering.NULLS_FIRST
       case _ => return false
     }
 
-    val direction = order.direction() match {
+    val direction = orders.head.direction() match {
       case expressions.SortDirection.DESCENDING => SortDirection.DESCENDING
       case expressions.SortDirection.ASCENDING => SortDirection.ASCENDING
       case _ => return false
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index 35190bac51..9541db60e8 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -343,6 +343,91 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
     Assertions.assertTrue(qe1.optimizedPlan.containsPattern(LIMIT))
   }
 
+  test("Paimon pushDown: topN for primary-key tables with deletion vector") {
+    assume(gteqSpark3_3)
+    withTable("dv_test") {
+      spark.sql("""
+                  |CREATE TABLE dv_test (id INT, c1 INT, c2 STRING) 
TBLPROPERTIES (
+                  |'primary-key'='id',
+                  |'deletion-vectors.enabled' = 'true',
+                  |'file-index.range-bitmap.columns'='c1'
+                  |)
+                  |""".stripMargin)
+
+      spark.sql(
+        "insert into table dv_test values(1, 1, 'a'),(2, 2,'b'),(3, 3, 
'c'),(4, 4, 'd'),(5, 5, 'e'),(6, NULL, 'f')")
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS FIRST LIMIT 3"),
+        Row(6, null, "f") :: Row(1, 1, "a") :: Row(2, 2, "b") :: Nil)
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS LAST LIMIT 3"),
+        Row(1, 1, "a") :: Row(2, 2, "b") :: Row(3, 3, "c") :: Nil)
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS FIRST LIMIT 
3"),
+        Row(6, null, "f") :: Row(5, 5, "e") :: Row(4, 4, "d") :: Nil)
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS LAST LIMIT 3"),
+        Row(5, 5, "e") :: Row(4, 4, "d") :: Row(3, 3, "c") :: Nil)
+
+      spark.sql("delete from dv_test where id IN (1, 5)")
+
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS FIRST LIMIT 3"),
+        Row(6, null, "f") :: Row(2, 2, "b") :: Row(3, 3, "c") :: Nil)
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS LAST LIMIT 3"),
+        Row(2, 2, "b") :: Row(3, 3, "c") :: Row(4, 4, "d") :: Nil)
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS FIRST LIMIT 
3"),
+        Row(6, null, "f") :: Row(4, 4, "d") :: Row(3, 3, "c") :: Nil)
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS LAST LIMIT 3"),
+        Row(4, 4, "d") :: Row(3, 3, "c") :: Row(2, 2, "b") :: Nil)
+    }
+  }
+
+  test("Paimon pushDown: topN for append-only tables with deletion vector") {
+    assume(gteqSpark3_3)
+    withTable("dv_test") {
+      spark.sql("""
+                  |CREATE TABLE dv_test (c1 INT, c2 STRING) TBLPROPERTIES (
+                  |'deletion-vectors.enabled' = 'true',
+                  |'file-index.range-bitmap.columns'='c1'
+                  |)
+                  |""".stripMargin)
+
+      spark.sql(
+        "insert into table dv_test values(1, 'a'),(2, 'b'),(3, 'c'),(4, 
'd'),(5, 'e'),(NULL, 'f')")
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS FIRST LIMIT 3"),
+        Row(null, "f") :: Row(1, "a") :: Row(2, "b") :: Nil)
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS LAST LIMIT 3"),
+        Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS FIRST LIMIT 
3"),
+        Row(null, "f") :: Row(5, "e") :: Row(4, "d") :: Nil)
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS LAST LIMIT 3"),
+        Row(5, "e") :: Row(4, "d") :: Row(3, "c") :: Nil)
+
+      spark.sql("delete from dv_test where c1 IN (1, 5)")
+
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS FIRST LIMIT 3"),
+        Row(null, "f") :: Row(2, "b") :: Row(3, "c") :: Nil)
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS LAST LIMIT 3"),
+        Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil)
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS FIRST LIMIT 
3"),
+        Row(null, "f") :: Row(4, "d") :: Row(3, "c") :: Nil)
+      checkAnswer(
+        spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS LAST LIMIT 3"),
+        Row(4, "d") :: Row(3, "c") :: Row(2, "b") :: Nil)
+    }
+  }
+
   test(s"Paimon pushdown: parquet in-filter") {
     withTable("T") {
       spark.sql(s"""

Reply via email to