This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 6646071840 [core] Support TopN pushdown in deletion-vector mode (#6097) 6646071840 is described below commit 6646071840b53c822a88ef680ad5c0722f6f15b3 Author: Tan-JiaLiang <tanjialiang1...@gmail.com> AuthorDate: Wed Aug 20 12:27:34 2025 +0800 [core] Support TopN pushdown in deletion-vector mode (#6097) --- .../paimon/fileindex/bitmap/BitmapIndexResult.java | 17 ++++ .../org/apache/paimon/utils/RoaringBitmap32.java | 4 + .../org/apache/paimon/AppendOnlyFileStore.java | 3 +- .../java/org/apache/paimon/KeyValueFileStore.java | 3 +- .../org/apache/paimon/io/FileIndexEvaluator.java | 100 +++++++++++++-------- .../apache/paimon/operation/RawFileSplitRead.java | 29 ++---- .../paimon/table/source/KeyValueTableRead.java | 12 +++ .../paimon/table/AppendOnlySimpleTableTest.java | 33 ------- .../paimon/table/PrimaryKeySimpleTableTest.java | 100 +++++++++++++++++++++ .../flink/source/TestChangelogDataReadWrite.java | 1 - .../apache/paimon/spark/PaimonScanBuilder.scala | 13 +-- .../paimon/spark/sql/PaimonPushDownTestBase.scala | 85 ++++++++++++++++++ 12 files changed, 291 insertions(+), 109 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java index a1fc6ca551..96e6fe8284 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java @@ -22,6 +22,7 @@ import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; +import java.util.Objects; import java.util.function.Supplier; /** bitmap file index result. */ @@ -53,4 +54,20 @@ public class BitmapIndexResult extends LazyField<RoaringBitmap32> implements Fil } return FileIndexResult.super.or(fileIndexResult); } + + public FileIndexResult andNot(RoaringBitmap32 deletion) { + return new BitmapIndexResult(() -> RoaringBitmap32.andNot(get(), deletion)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BitmapIndexResult that = (BitmapIndexResult) o; + return Objects.equals(this.get(), that.get()); + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java index 8c3b2802ac..b37eb837cf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java @@ -175,6 +175,10 @@ public class RoaringBitmap32 { return roaringBitmap32; } + public static RoaringBitmap32 bitmapOfRange(long min, long max) { + return new RoaringBitmap32(RoaringBitmap.bitmapOfRange(min, max)); + } + public static RoaringBitmap32 and(final RoaringBitmap32 x1, final RoaringBitmap32 x2) { return new RoaringBitmap32(RoaringBitmap.and(x1.roaringBitmap, x2.roaringBitmap)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index d7fa9eb66b..0d9ad3b121 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -83,8 +83,7 @@ public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> { FileFormatDiscover.of(options), pathFactory(), options.fileIndexReadEnabled(), - options.rowTrackingEnabled(), - options.deletionVectorsEnabled()); + options.rowTrackingEnabled()); } public DataEvolutionSplitRead newDataEvolutionRead() { diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 48d0bf872b..f7a743ace1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -132,8 +132,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> { FileFormatDiscover.of(options), pathFactory(), options.fileIndexReadEnabled(), - false, - options.deletionVectorsEnabled()); + false); } public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java index c71af3f2bc..bf04b453af 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java @@ -18,19 +18,24 @@ package org.apache.paimon.io; +import org.apache.paimon.deletionvectors.BitmapDeletionVector; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.fileindex.FileIndexPredicate; import org.apache.paimon.fileindex.FileIndexResult; +import org.apache.paimon.fileindex.bitmap.BitmapIndexResult; import org.apache.paimon.fs.FileIO; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.predicate.TopN; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.utils.ListUtils; +import org.apache.paimon.utils.RoaringBitmap32; import javax.annotation.Nullable; import java.io.IOException; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; /** Evaluate file index result. */ @@ -42,55 +47,76 @@ public class FileIndexEvaluator { List<Predicate> dataFilter, @Nullable TopN topN, DataFilePathFactory dataFilePathFactory, - DataFileMeta file) + DataFileMeta file, + @Nullable DeletionVector deletionVector) throws IOException { - FileIndexResult result = FileIndexResult.REMAIN; if (ListUtils.isNullOrEmpty(dataFilter) && topN == null) { - return result; + return FileIndexResult.REMAIN; + } + + FileIndexResult selection = + new BitmapIndexResult(() -> RoaringBitmap32.bitmapOfRange(0, file.rowCount())); + if (deletionVector instanceof BitmapDeletionVector) { + RoaringBitmap32 deletion = ((BitmapDeletionVector) deletionVector).get(); + selection = ((BitmapIndexResult) selection).andNot(deletion); } - FileIndexPredicate predicate = null; - try { - byte[] embeddedIndex = file.embeddedIndex(); - if (embeddedIndex != null) { - predicate = new FileIndexPredicate(embeddedIndex, dataSchema.logicalRowType()); - } else { - List<String> indexFiles = - file.extraFiles().stream() - .filter( - name -> - name.endsWith( - DataFilePathFactory.INDEX_PATH_SUFFIX)) - .collect(Collectors.toList()); - if (indexFiles.isEmpty()) { - return result; + try (FileIndexPredicate predicate = + createFileIndexPredicate(fileIO, dataSchema, dataFilePathFactory, file)) { + FileIndexResult result = FileIndexResult.REMAIN; + if (predicate != null) { + if (!ListUtils.isNullOrEmpty(dataFilter)) { + Predicate filter = PredicateBuilder.and(dataFilter.toArray(new Predicate[0])); + result = predicate.evaluate(filter); + result.and(selection); + } else if (topN != null) { + result = predicate.evaluateTopN(topN, selection); } - if (indexFiles.size() > 1) { - throw new RuntimeException( - "Found more than one index file for one data file: " - + String.join(" and ", indexFiles)); + + // if all position selected, or if only and not the deletion + // the effect will not obvious, just return REMAIN. + if (Objects.equals(result, selection)) { + result = FileIndexResult.REMAIN; } - predicate = - new FileIndexPredicate( - dataFilePathFactory.toAlignedPath(indexFiles.get(0), file), - fileIO, - dataSchema.logicalRowType()); } - // evaluate - if (!ListUtils.isNullOrEmpty(dataFilter)) { - result = - predicate.evaluate( - PredicateBuilder.and(dataFilter.toArray(new Predicate[0]))); - } else if (topN != null) { - result = predicate.evaluateTopN(topN, result); + if (!result.remain()) { + result = FileIndexResult.SKIP; } return result; - } finally { - if (predicate != null) { - predicate.close(); + } + } + + private static FileIndexPredicate createFileIndexPredicate( + FileIO fileIO, + TableSchema dataSchema, + DataFilePathFactory dataFilePathFactory, + DataFileMeta file) + throws IOException { + FileIndexPredicate predicate; + byte[] embeddedIndex = file.embeddedIndex(); + if (embeddedIndex != null) { + predicate = new FileIndexPredicate(embeddedIndex, dataSchema.logicalRowType()); + } else { + List<String> indexFiles = + file.extraFiles().stream() + .filter(name -> name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX)) + .collect(Collectors.toList()); + if (indexFiles.isEmpty()) { + return null; + } + if (indexFiles.size() > 1) { + throw new RuntimeException( + "Found more than one index file for one data file: " + + String.join(" and ", indexFiles)); } + predicate = + new FileIndexPredicate( + dataFilePathFactory.toAlignedPath(indexFiles.get(0), file), + fileIO, + dataSchema.logicalRowType()); } + return predicate; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 27211a26bd..1c1249978a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -21,7 +21,6 @@ package org.apache.paimon.operation; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; -import org.apache.paimon.deletionvectors.BitmapDeletionVector; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.disk.IOManager; import org.apache.paimon.fileindex.FileIndexResult; @@ -77,7 +76,6 @@ public class RawFileSplitRead implements SplitRead<InternalRow> { private final TableSchema schema; private final FileFormatDiscover formatDiscover; private final FileStorePathFactory pathFactory; - private final boolean deletionVectorsEnabled; private final Map<FormatKey, FormatReaderMapping> formatReaderMappings; private final boolean fileIndexReadEnabled; private final boolean rowTrackingEnabled; @@ -94,14 +92,12 @@ public class RawFileSplitRead implements SplitRead<InternalRow> { FileFormatDiscover formatDiscover, FileStorePathFactory pathFactory, boolean fileIndexReadEnabled, - boolean rowTrackingEnabled, - boolean deletionVectorsEnabled) { + boolean rowTrackingEnabled) { this.fileIO = fileIO; this.schemaManager = schemaManager; this.schema = schema; this.formatDiscover = formatDiscover; this.pathFactory = pathFactory; - this.deletionVectorsEnabled = deletionVectorsEnabled; this.formatReaderMappings = new HashMap<>(); this.fileIndexReadEnabled = fileIndexReadEnabled; this.rowTrackingEnabled = rowTrackingEnabled; @@ -134,9 +130,7 @@ public class RawFileSplitRead implements SplitRead<InternalRow> { @Override public SplitRead<InternalRow> withTopN(@Nullable TopN topN) { - if (!deletionVectorsEnabled) { - this.topN = topN; - } + this.topN = topN; return this; } @@ -228,6 +222,7 @@ public class RawFileSplitRead implements SplitRead<InternalRow> { IOExceptionSupplier<DeletionVector> dvFactory) throws IOException { FileIndexResult fileIndexResult = null; + DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get(); if (fileIndexReadEnabled) { fileIndexResult = FileIndexEvaluator.evaluate( @@ -236,7 +231,8 @@ public class RawFileSplitRead implements SplitRead<InternalRow> { formatReaderMapping.getDataFilters(), formatReaderMapping.getTopN(), dataFilePathFactory, - file); + file, + deletionVector); if (!fileIndexResult.remain()) { return new EmptyFileRecordReader<>(); } @@ -247,21 +243,6 @@ public class RawFileSplitRead implements SplitRead<InternalRow> { selection = ((BitmapIndexResult) fileIndexResult).get(); } - RoaringBitmap32 deletion = null; - DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get(); - if (deletionVector instanceof BitmapDeletionVector) { - deletion = ((BitmapDeletionVector) deletionVector).get(); - } - - if (selection != null) { - if (deletion != null) { - selection = RoaringBitmap32.andNot(selection, deletion); - } - if (selection.isEmpty()) { - return new EmptyFileRecordReader<>(); - } - } - FormatReaderContext formatReaderContext = new FormatReaderContext( fileIO, dataFilePathFactory.toPath(file), file.fileSize(), selection); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java index c4bede61d8..5df41399cb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java @@ -26,6 +26,7 @@ import org.apache.paimon.operation.MergeFileSplitRead; import org.apache.paimon.operation.RawFileSplitRead; import org.apache.paimon.operation.SplitRead; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.TopN; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.splitread.IncrementalChangelogReadProvider; @@ -54,6 +55,7 @@ public final class KeyValueTableRead extends AbstractDataTableRead { private boolean forceKeepDelete = false; private Predicate predicate = null; private IOManager ioManager = null; + @Nullable private TopN topN = null; public KeyValueTableRead( Supplier<MergeFileSplitRead> mergeReadSupplier, @@ -86,6 +88,9 @@ public final class KeyValueTableRead extends AbstractDataTableRead { if (readType != null) { read = read.withReadType(readType); } + if (topN != null) { + read = read.withTopN(topN); + } read.withFilter(predicate).withIOManager(ioManager); } @@ -109,6 +114,13 @@ public final class KeyValueTableRead extends AbstractDataTableRead { return this; } + @Override + public InnerTableRead withTopN(TopN topN) { + initialized().forEach(r -> r.withTopN(topN)); + this.topN = topN; + return this; + } + @Override public TableRead withIOManager(IOManager ioManager) { initialized().forEach(r -> r.withIOManager(ioManager)); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java index c262f892a8..024e6320b8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java @@ -92,7 +92,6 @@ import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.BUCKET_KEY; import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY; -import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED; import static org.apache.paimon.CoreOptions.FILE_FORMAT; import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET; import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD; @@ -945,38 +944,6 @@ public class AppendOnlySimpleTableTest extends SimpleTableTestBase { assertThat(cnt.get()).isEqualTo(rowCount); reader.close(); } - - // test should not push topN with dv modes - { - table.schemaManager() - .commitChanges(SchemaChange.updateColumnType("price", DataTypes.INT())); - rowType = - RowType.builder() - .field("id", DataTypes.STRING()) - .field("event", DataTypes.STRING()) - .field("price", DataTypes.INT()) - .build(); - Consumer<Options> newConfigure = - options -> { - configure.accept(options); - options.set(DELETION_VECTORS_ENABLED, true); - }; - table = createUnawareBucketFileStoreTable(rowType, newConfigure); - DataField field = rowType.getField("price"); - SortValue sort = - new SortValue( - new FieldRef(field.id(), field.name(), field.type()), - SortValue.SortDirection.DESCENDING, - SortValue.NullOrdering.NULLS_LAST); - TopN topN = new TopN(Collections.singletonList(sort), k); - TableScan.Plan plan = table.newScan().plan(); - RecordReader<InternalRow> reader = - table.newRead().withTopN(topN).createReader(plan.splits()); - AtomicInteger cnt = new AtomicInteger(0); - reader.forEachRemaining(row -> cnt.incrementAndGet()); - assertThat(cnt.get()).isEqualTo(rowCount); - reader.close(); - } } @Test diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java index c02d617dbe..1e4b8b185b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java @@ -41,8 +41,11 @@ import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.postpone.PostponeBucketFileStoreWrite; import org.apache.paimon.postpone.PostponeBucketWriter; +import org.apache.paimon.predicate.FieldRef; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.predicate.SortValue; +import org.apache.paimon.predicate.TopN; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; @@ -72,12 +75,14 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.table.system.AuditLogTable; import org.apache.paimon.table.system.FileMonitorTable; import org.apache.paimon.table.system.ReadOptimizedTable; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.RoaringBitmap32; import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat; @@ -1203,6 +1208,101 @@ public class PrimaryKeySimpleTableTest extends SimpleTableTestBase { } } + @Test + public void testTopNPushDownInDeletionVectorMode() throws Exception { + String indexColumnName = "b"; + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(BUCKET, 1); + conf.set(FILE_FORMAT, FILE_FORMAT_PARQUET); + conf.set(DELETION_VECTORS_ENABLED, true); + conf.set(ParquetOutputFormat.BLOCK_SIZE, "524288"); + conf.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100"); + conf.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300"); + conf.set("file-index.range-bitmap.columns", indexColumnName); + }); + + int rowCount = 1000000; + StreamTableWrite write = + table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString())); + StreamTableCommit commit = table.newCommit(commitUser); + + // append + for (int i = 0; i < rowCount; i++) { + write.write(rowDataWithKind(RowKind.INSERT, 1, i, (long) i)); + } + commit.commit(0, write.prepareCommit(true, 0)); + + // delete [0, 20000] + int min = 20000; + for (int i = 0; i < 20000; i++) { + write.write(rowDataWithKind(RowKind.DELETE, 1, i, (long) i)); + } + commit.commit(1, write.prepareCommit(true, 1)); + + // delete (rowCount - 20000, rowCount) + int max = rowCount - 20000; + for (int i = rowCount - 20000; i < rowCount; i++) { + write.write(rowDataWithKind(RowKind.DELETE, 1, i, (long) i)); + } + commit.commit(2, write.prepareCommit(true, 2)); + write.close(); + commit.close(); + + // test bottom k + { + int k = new Random().nextInt(100); + RoaringBitmap32 bitmap = RoaringBitmap32.bitmapOfRange(min, min + k); + DataField field = table.schema().nameToFieldMap().get(indexColumnName); + SortValue sort = + new SortValue( + new FieldRef(field.id(), field.name(), field.type()), + SortValue.SortDirection.ASCENDING, + SortValue.NullOrdering.NULLS_LAST); + TopN topN = new TopN(Collections.singletonList(sort), k); + TableScan.Plan plan = table.newScan().plan(); + RecordReader<InternalRow> reader = + table.newRead().withTopN(topN).createReader(plan.splits()); + AtomicInteger cnt = new AtomicInteger(0); + RoaringBitmap32 actual = new RoaringBitmap32(); + reader.forEachRemaining( + row -> { + cnt.incrementAndGet(); + actual.add((int) row.getLong(2)); + }); + assertThat(cnt.get()).isEqualTo(k); + assertThat(actual).isEqualTo(bitmap); + reader.close(); + } + + // test top k + { + int k = new Random().nextInt(100); + RoaringBitmap32 bitmap = RoaringBitmap32.bitmapOfRange(max - k, max); + DataField field = table.schema().nameToFieldMap().get(indexColumnName); + SortValue sort = + new SortValue( + new FieldRef(field.id(), field.name(), field.type()), + SortValue.SortDirection.DESCENDING, + SortValue.NullOrdering.NULLS_LAST); + TopN topN = new TopN(Collections.singletonList(sort), k); + TableScan.Plan plan = table.newScan().plan(); + RecordReader<InternalRow> reader = + table.newRead().withTopN(topN).createReader(plan.splits()); + AtomicInteger cnt = new AtomicInteger(0); + RoaringBitmap32 actual = new RoaringBitmap32(); + reader.forEachRemaining( + row -> { + cnt.incrementAndGet(); + actual.add((int) row.getLong(2)); + }); + assertThat(cnt.get()).isEqualTo(k); + assertThat(actual).isEqualTo(bitmap); + reader.close(); + } + } + @Test public void testWithShardFirstRow() throws Exception { FileStoreTable table = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index cb6ef1c40b..9847276fe8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -147,7 +147,6 @@ public class TestChangelogDataReadWrite { FileFormatDiscover.of(options), pathFactory, options.fileIndexReadEnabled(), - false, false); return new KeyValueTableRead(() -> read, () -> rawFileRead, null); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala index 729613f596..aa50f76043 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala @@ -18,7 +18,6 @@ package org.apache.paimon.spark -import org.apache.paimon.CoreOptions import org.apache.paimon.predicate._ import org.apache.paimon.predicate.SortValue.{NullOrdering, SortDirection} import org.apache.paimon.spark.aggregate.{AggregatePushDownUtils, LocalAggregator} @@ -102,12 +101,7 @@ class PaimonScanBuilder(table: InnerTable) return false } - if (!table.isInstanceOf[AppendOnlyFileStoreTable]) { - return false - } - - val coreOptions = CoreOptions.fromMap(table.options()) - if (coreOptions.deletionVectorsEnabled()) { + if (!table.isInstanceOf[FileStoreTable]) { return false } @@ -115,7 +109,6 @@ class PaimonScanBuilder(table: InnerTable) return false } - val order = orders(0) val fieldName = orders.head.expression() match { case nr: NamedReference => nr.fieldNames.mkString(".") case _ => return false @@ -129,13 +122,13 @@ class PaimonScanBuilder(table: InnerTable) val field = rowType.getField(fieldName) val ref = new FieldRef(field.id(), field.name(), field.`type`()) - val nullOrdering = order.nullOrdering() match { + val nullOrdering = orders.head.nullOrdering() match { case expressions.NullOrdering.NULLS_LAST => NullOrdering.NULLS_LAST case expressions.NullOrdering.NULLS_FIRST => NullOrdering.NULLS_FIRST case _ => return false } - val direction = order.direction() match { + val direction = orders.head.direction() match { case expressions.SortDirection.DESCENDING => SortDirection.DESCENDING case expressions.SortDirection.ASCENDING => SortDirection.ASCENDING case _ => return false diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala index 35190bac51..9541db60e8 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala @@ -343,6 +343,91 @@ abstract class PaimonPushDownTestBase extends PaimonSparkTestBase { Assertions.assertTrue(qe1.optimizedPlan.containsPattern(LIMIT)) } + test("Paimon pushDown: topN for primary-key tables with deletion vector") { + assume(gteqSpark3_3) + withTable("dv_test") { + spark.sql(""" + |CREATE TABLE dv_test (id INT, c1 INT, c2 STRING) TBLPROPERTIES ( + |'primary-key'='id', + |'deletion-vectors.enabled' = 'true', + |'file-index.range-bitmap.columns'='c1' + |) + |""".stripMargin) + + spark.sql( + "insert into table dv_test values(1, 1, 'a'),(2, 2,'b'),(3, 3, 'c'),(4, 4, 'd'),(5, 5, 'e'),(6, NULL, 'f')") + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS FIRST LIMIT 3"), + Row(6, null, "f") :: Row(1, 1, "a") :: Row(2, 2, "b") :: Nil) + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS LAST LIMIT 3"), + Row(1, 1, "a") :: Row(2, 2, "b") :: Row(3, 3, "c") :: Nil) + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS FIRST LIMIT 3"), + Row(6, null, "f") :: Row(5, 5, "e") :: Row(4, 4, "d") :: Nil) + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS LAST LIMIT 3"), + Row(5, 5, "e") :: Row(4, 4, "d") :: Row(3, 3, "c") :: Nil) + + spark.sql("delete from dv_test where id IN (1, 5)") + + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS FIRST LIMIT 3"), + Row(6, null, "f") :: Row(2, 2, "b") :: Row(3, 3, "c") :: Nil) + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS LAST LIMIT 3"), + Row(2, 2, "b") :: Row(3, 3, "c") :: Row(4, 4, "d") :: Nil) + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS FIRST LIMIT 3"), + Row(6, null, "f") :: Row(4, 4, "d") :: Row(3, 3, "c") :: Nil) + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS LAST LIMIT 3"), + Row(4, 4, "d") :: Row(3, 3, "c") :: Row(2, 2, "b") :: Nil) + } + } + + test("Paimon pushDown: topN for append-only tables with deletion vector") { + assume(gteqSpark3_3) + withTable("dv_test") { + spark.sql(""" + |CREATE TABLE dv_test (c1 INT, c2 STRING) TBLPROPERTIES ( + |'deletion-vectors.enabled' = 'true', + |'file-index.range-bitmap.columns'='c1' + |) + |""".stripMargin) + + spark.sql( + "insert into table dv_test values(1, 'a'),(2, 'b'),(3, 'c'),(4, 'd'),(5, 'e'),(NULL, 'f')") + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS FIRST LIMIT 3"), + Row(null, "f") :: Row(1, "a") :: Row(2, "b") :: Nil) + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS LAST LIMIT 3"), + Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS FIRST LIMIT 3"), + Row(null, "f") :: Row(5, "e") :: Row(4, "d") :: Nil) + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS LAST LIMIT 3"), + Row(5, "e") :: Row(4, "d") :: Row(3, "c") :: Nil) + + spark.sql("delete from dv_test where c1 IN (1, 5)") + + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS FIRST LIMIT 3"), + Row(null, "f") :: Row(2, "b") :: Row(3, "c") :: Nil) + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS LAST LIMIT 3"), + Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil) + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS FIRST LIMIT 3"), + Row(null, "f") :: Row(4, "d") :: Row(3, "c") :: Nil) + checkAnswer( + spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS LAST LIMIT 3"), + Row(4, "d") :: Row(3, "c") :: Row(2, "b") :: Nil) + } + } + test(s"Paimon pushdown: parquet in-filter") { withTable("T") { spark.sql(s"""