This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new d5723fc9a0 [core] should not push topN with file schema evolution (#6085) d5723fc9a0 is described below commit d5723fc9a02bd3b5d256efa045021137bb5c8532 Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Mon Aug 18 15:38:04 2025 +0800 [core] should not push topN with file schema evolution (#6085) --- .../java/org/apache/paimon/schema/TableSchema.java | 26 ++++-- .../org/apache/paimon/AppendOnlyFileStore.java | 3 +- .../java/org/apache/paimon/KeyValueFileStore.java | 3 +- .../org/apache/paimon/io/FileIndexEvaluator.java | 4 +- .../apache/paimon/operation/RawFileSplitRead.java | 9 +- .../apache/paimon/utils/FormatReaderMapping.java | 22 ++++- .../paimon/table/AppendOnlySimpleTableTest.java | 95 ++++++++++++++++++---- .../flink/source/TestChangelogDataReadWrite.java | 1 + 8 files changed, 131 insertions(+), 32 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java index 6e012c016b..b4baf20cb1 100644 --- a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -143,6 +143,16 @@ public class TableSchema implements Serializable { return fields.stream().map(DataField::name).collect(Collectors.toList()); } + public Map<String, DataField> nameToFieldMap() { + return fields.stream() + .collect(Collectors.toMap(DataField::name, field -> field, (a, b) -> b)); + } + + public Map<Integer, DataField> idToFieldMap() { + return fields.stream() + .collect(Collectors.toMap(DataField::id, field -> field, (a, b) -> b)); + } + public int highestFieldId() { return highestFieldId; } @@ -156,14 +166,14 @@ public class TableSchema implements Serializable { } public List<String> trimmedPrimaryKeys() { - if (primaryKeys.size() > 0) { + if (!primaryKeys.isEmpty()) { List<String> adjusted = primaryKeys.stream() .filter(pk -> !partitionKeys.contains(pk)) .collect(Collectors.toList()); Preconditions.checkState( - adjusted.size() > 0, + !adjusted.isEmpty(), String.format( "Primary key constraint %s should not be same with partition fields %s," + " this will result in only one record in a partition", @@ -192,7 +202,7 @@ public class TableSchema implements Serializable { return false; } - return !primaryKeys.containsAll(partitionKeys); + return notContainsAll(primaryKeys, partitionKeys); } /** Original bucket keys, maybe empty. */ @@ -202,7 +212,7 @@ public class TableSchema implements Serializable { return Collections.emptyList(); } List<String> bucketKeys = Arrays.asList(key.split(",")); - if (!containsAll(fieldNames(), bucketKeys)) { + if (notContainsAll(fieldNames(), bucketKeys)) { throw new RuntimeException( String.format( "Field names %s should contains all bucket keys %s.", @@ -214,8 +224,8 @@ public class TableSchema implements Serializable { "Bucket keys %s should not in partition keys %s.", bucketKeys, partitionKeys)); } - if (primaryKeys.size() > 0) { - if (!containsAll(primaryKeys, bucketKeys)) { + if (!primaryKeys.isEmpty()) { + if (notContainsAll(primaryKeys, bucketKeys)) { throw new RuntimeException( String.format( "Primary keys %s should contains all bucket keys %s.", @@ -225,8 +235,8 @@ public class TableSchema implements Serializable { return bucketKeys; } - private boolean containsAll(List<String> all, List<String> contains) { - return new HashSet<>(all).containsAll(new HashSet<>(contains)); + private boolean notContainsAll(List<String> all, List<String> contains) { + return !new HashSet<>(all).containsAll(new HashSet<>(contains)); } public @Nullable String comment() { diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 0d9ad3b121..d7fa9eb66b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -83,7 +83,8 @@ public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> { FileFormatDiscover.of(options), pathFactory(), options.fileIndexReadEnabled(), - options.rowTrackingEnabled()); + options.rowTrackingEnabled(), + options.deletionVectorsEnabled()); } public DataEvolutionSplitRead newDataEvolutionRead() { diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index f7a743ace1..48d0bf872b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -132,7 +132,8 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> { FileFormatDiscover.of(options), pathFactory(), options.fileIndexReadEnabled(), - false); + false, + options.deletionVectorsEnabled()); } public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java index b58907de1c..c71af3f2bc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java @@ -27,6 +27,8 @@ import org.apache.paimon.predicate.TopN; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.utils.ListUtils; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.List; import java.util.stream.Collectors; @@ -38,7 +40,7 @@ public class FileIndexEvaluator { FileIO fileIO, TableSchema dataSchema, List<Predicate> dataFilter, - TopN topN, + @Nullable TopN topN, DataFilePathFactory dataFilePathFactory, DataFileMeta file) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 44374a503a..27211a26bd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -77,6 +77,7 @@ public class RawFileSplitRead implements SplitRead<InternalRow> { private final TableSchema schema; private final FileFormatDiscover formatDiscover; private final FileStorePathFactory pathFactory; + private final boolean deletionVectorsEnabled; private final Map<FormatKey, FormatReaderMapping> formatReaderMappings; private final boolean fileIndexReadEnabled; private final boolean rowTrackingEnabled; @@ -93,12 +94,14 @@ public class RawFileSplitRead implements SplitRead<InternalRow> { FileFormatDiscover formatDiscover, FileStorePathFactory pathFactory, boolean fileIndexReadEnabled, - boolean rowTrackingEnabled) { + boolean rowTrackingEnabled, + boolean deletionVectorsEnabled) { this.fileIO = fileIO; this.schemaManager = schemaManager; this.schema = schema; this.formatDiscover = formatDiscover; this.pathFactory = pathFactory; + this.deletionVectorsEnabled = deletionVectorsEnabled; this.formatReaderMappings = new HashMap<>(); this.fileIndexReadEnabled = fileIndexReadEnabled; this.rowTrackingEnabled = rowTrackingEnabled; @@ -131,7 +134,9 @@ public class RawFileSplitRead implements SplitRead<InternalRow> { @Override public SplitRead<InternalRow> withTopN(@Nullable TopN topN) { - this.topN = topN; + if (!deletionVectorsEnabled) { + this.topN = topN; + } return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java index 8422ed8e78..4f867b5ac1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java @@ -23,6 +23,7 @@ import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.partition.PartitionUtils; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.SortValue; import org.apache.paimon.predicate.TopN; import org.apache.paimon.schema.IndexCastMapping; import org.apache.paimon.schema.SchemaEvolutionUtil; @@ -41,6 +42,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.Function; import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields; @@ -221,7 +223,25 @@ public class FormatReaderMapping { dataSchema, readFilters, systemFields, - topN); + evolutionTopN(tableSchema, dataSchema)); + } + + @Nullable + private TopN evolutionTopN(TableSchema tableSchema, TableSchema dataSchema) { + TopN pushTopN = topN; + if (pushTopN != null) { + Map<String, DataField> tableFields = tableSchema.nameToFieldMap(); + Map<Integer, DataField> dataFields = dataSchema.idToFieldMap(); + for (SortValue value : pushTopN.orders()) { + DataField tableField = tableFields.get(value.field().name()); + DataField dataField = dataFields.get(tableField.id()); + if (!Objects.equals(tableField, dataField)) { + pushTopN = null; + break; + } + } + } + return pushTopN; } public FormatReaderMapping build( diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java index 3188f3223c..babb62a995 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java @@ -46,6 +46,7 @@ import org.apache.paimon.predicate.SortValue; import org.apache.paimon.predicate.TopN; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; import org.apache.paimon.schema.TableSchema; @@ -91,6 +92,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.BUCKET_KEY; import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY; +import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED; import static org.apache.paimon.CoreOptions.FILE_FORMAT; import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET; import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD; @@ -831,25 +833,23 @@ public class AppendOnlySimpleTableTest extends SimpleTableTestBase { .field("event", DataTypes.STRING()) .field("price", DataTypes.INT()) .build(); + Consumer<Options> configure = + options -> { + options.set(FILE_FORMAT, FILE_FORMAT_PARQUET); + options.set(WRITE_ONLY, true); + options.set( + FileIndexOptions.FILE_INDEX + + "." + + RangeBitmapFileIndexFactory.RANGE_BITMAP + + "." + + CoreOptions.COLUMNS, + "price"); + options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576"); + options.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100"); + options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300"); + }; // in unaware-bucket mode, we split files into splits all the time - FileStoreTable table = - createUnawareBucketFileStoreTable( - rowType, - options -> { - options.set(FILE_FORMAT, FILE_FORMAT_PARQUET); - options.set(WRITE_ONLY, true); - options.set( - FileIndexOptions.FILE_INDEX - + "." - + RangeBitmapFileIndexFactory.RANGE_BITMAP - + "." - + CoreOptions.COLUMNS, - "price"); - options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576"); - options.set( - ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100"); - options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300"); - }); + FileStoreTable table = createUnawareBucketFileStoreTable(rowType, configure); int bound = 300000; int rowCount = 1000000; @@ -918,6 +918,65 @@ public class AppendOnlySimpleTableTest extends SimpleTableTestBase { assertThat(cnt.get()).isEqualTo(rowCount); reader.close(); } + + // test should not push topN with index and evolution + { + table.schemaManager() + .commitChanges(SchemaChange.updateColumnType("price", DataTypes.BIGINT())); + rowType = + RowType.builder() + .field("id", DataTypes.STRING()) + .field("event", DataTypes.STRING()) + .field("price", DataTypes.BIGINT()) + .build(); + table = createUnawareBucketFileStoreTable(rowType, configure); + DataField field = rowType.getField("price"); + SortValue sort = + new SortValue( + new FieldRef(field.id(), field.name(), field.type()), + SortValue.SortDirection.DESCENDING, + SortValue.NullOrdering.NULLS_LAST); + TopN topN = new TopN(Collections.singletonList(sort), k); + TableScan.Plan plan = table.newScan().plan(); + RecordReader<InternalRow> reader = + table.newRead().withTopN(topN).createReader(plan.splits()); + AtomicInteger cnt = new AtomicInteger(0); + reader.forEachRemaining(row -> cnt.incrementAndGet()); + assertThat(cnt.get()).isEqualTo(rowCount); + reader.close(); + } + + // test should not push topN with dv modes + { + table.schemaManager() + .commitChanges(SchemaChange.updateColumnType("price", DataTypes.INT())); + rowType = + RowType.builder() + .field("id", DataTypes.STRING()) + .field("event", DataTypes.STRING()) + .field("price", DataTypes.INT()) + .build(); + Consumer<Options> newConfigure = + options -> { + configure.accept(options); + options.set(DELETION_VECTORS_ENABLED, true); + }; + table = createUnawareBucketFileStoreTable(rowType, newConfigure); + DataField field = rowType.getField("price"); + SortValue sort = + new SortValue( + new FieldRef(field.id(), field.name(), field.type()), + SortValue.SortDirection.DESCENDING, + SortValue.NullOrdering.NULLS_LAST); + TopN topN = new TopN(Collections.singletonList(sort), k); + TableScan.Plan plan = table.newScan().plan(); + RecordReader<InternalRow> reader = + table.newRead().withTopN(topN).createReader(plan.splits()); + AtomicInteger cnt = new AtomicInteger(0); + reader.forEachRemaining(row -> cnt.incrementAndGet()); + assertThat(cnt.get()).isEqualTo(rowCount); + reader.close(); + } } @Test diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index f234ae980a..2e9695a525 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -147,6 +147,7 @@ public class TestChangelogDataReadWrite { FileFormatDiscover.of(options), pathFactory, options.fileIndexReadEnabled(), + false, false); return new KeyValueTableRead(() -> read, () -> rawFileRead, null); }