This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d5723fc9a0 [core] should not push topN with file schema evolution 
(#6085)
d5723fc9a0 is described below

commit d5723fc9a02bd3b5d256efa045021137bb5c8532
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Mon Aug 18 15:38:04 2025 +0800

    [core] should not push topN with file schema evolution (#6085)
---
 .../java/org/apache/paimon/schema/TableSchema.java | 26 ++++--
 .../org/apache/paimon/AppendOnlyFileStore.java     |  3 +-
 .../java/org/apache/paimon/KeyValueFileStore.java  |  3 +-
 .../org/apache/paimon/io/FileIndexEvaluator.java   |  4 +-
 .../apache/paimon/operation/RawFileSplitRead.java  |  9 +-
 .../apache/paimon/utils/FormatReaderMapping.java   | 22 ++++-
 .../paimon/table/AppendOnlySimpleTableTest.java    | 95 ++++++++++++++++++----
 .../flink/source/TestChangelogDataReadWrite.java   |  1 +
 8 files changed, 131 insertions(+), 32 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java 
b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
index 6e012c016b..b4baf20cb1 100644
--- a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -143,6 +143,16 @@ public class TableSchema implements Serializable {
         return 
fields.stream().map(DataField::name).collect(Collectors.toList());
     }
 
+    public Map<String, DataField> nameToFieldMap() {
+        return fields.stream()
+                .collect(Collectors.toMap(DataField::name, field -> field, (a, 
b) -> b));
+    }
+
+    public Map<Integer, DataField> idToFieldMap() {
+        return fields.stream()
+                .collect(Collectors.toMap(DataField::id, field -> field, (a, 
b) -> b));
+    }
+
     public int highestFieldId() {
         return highestFieldId;
     }
@@ -156,14 +166,14 @@ public class TableSchema implements Serializable {
     }
 
     public List<String> trimmedPrimaryKeys() {
-        if (primaryKeys.size() > 0) {
+        if (!primaryKeys.isEmpty()) {
             List<String> adjusted =
                     primaryKeys.stream()
                             .filter(pk -> !partitionKeys.contains(pk))
                             .collect(Collectors.toList());
 
             Preconditions.checkState(
-                    adjusted.size() > 0,
+                    !adjusted.isEmpty(),
                     String.format(
                             "Primary key constraint %s should not be same with 
partition fields %s,"
                                     + " this will result in only one record in 
a partition",
@@ -192,7 +202,7 @@ public class TableSchema implements Serializable {
             return false;
         }
 
-        return !primaryKeys.containsAll(partitionKeys);
+        return notContainsAll(primaryKeys, partitionKeys);
     }
 
     /** Original bucket keys, maybe empty. */
@@ -202,7 +212,7 @@ public class TableSchema implements Serializable {
             return Collections.emptyList();
         }
         List<String> bucketKeys = Arrays.asList(key.split(","));
-        if (!containsAll(fieldNames(), bucketKeys)) {
+        if (notContainsAll(fieldNames(), bucketKeys)) {
             throw new RuntimeException(
                     String.format(
                             "Field names %s should contains all bucket keys 
%s.",
@@ -214,8 +224,8 @@ public class TableSchema implements Serializable {
                             "Bucket keys %s should not in partition keys %s.",
                             bucketKeys, partitionKeys));
         }
-        if (primaryKeys.size() > 0) {
-            if (!containsAll(primaryKeys, bucketKeys)) {
+        if (!primaryKeys.isEmpty()) {
+            if (notContainsAll(primaryKeys, bucketKeys)) {
                 throw new RuntimeException(
                         String.format(
                                 "Primary keys %s should contains all bucket 
keys %s.",
@@ -225,8 +235,8 @@ public class TableSchema implements Serializable {
         return bucketKeys;
     }
 
-    private boolean containsAll(List<String> all, List<String> contains) {
-        return new HashSet<>(all).containsAll(new HashSet<>(contains));
+    private boolean notContainsAll(List<String> all, List<String> contains) {
+        return !new HashSet<>(all).containsAll(new HashSet<>(contains));
     }
 
     public @Nullable String comment() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 0d9ad3b121..d7fa9eb66b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -83,7 +83,8 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 FileFormatDiscover.of(options),
                 pathFactory(),
                 options.fileIndexReadEnabled(),
-                options.rowTrackingEnabled());
+                options.rowTrackingEnabled(),
+                options.deletionVectorsEnabled());
     }
 
     public DataEvolutionSplitRead newDataEvolutionRead() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index f7a743ace1..48d0bf872b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -132,7 +132,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 FileFormatDiscover.of(options),
                 pathFactory(),
                 options.fileIndexReadEnabled(),
-                false);
+                false,
+                options.deletionVectorsEnabled());
     }
 
     public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
index b58907de1c..c71af3f2bc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
@@ -27,6 +27,8 @@ import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.utils.ListUtils;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -38,7 +40,7 @@ public class FileIndexEvaluator {
             FileIO fileIO,
             TableSchema dataSchema,
             List<Predicate> dataFilter,
-            TopN topN,
+            @Nullable TopN topN,
             DataFilePathFactory dataFilePathFactory,
             DataFileMeta file)
             throws IOException {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 44374a503a..27211a26bd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -77,6 +77,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
     private final TableSchema schema;
     private final FileFormatDiscover formatDiscover;
     private final FileStorePathFactory pathFactory;
+    private final boolean deletionVectorsEnabled;
     private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
     private final boolean fileIndexReadEnabled;
     private final boolean rowTrackingEnabled;
@@ -93,12 +94,14 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
             FileFormatDiscover formatDiscover,
             FileStorePathFactory pathFactory,
             boolean fileIndexReadEnabled,
-            boolean rowTrackingEnabled) {
+            boolean rowTrackingEnabled,
+            boolean deletionVectorsEnabled) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.schema = schema;
         this.formatDiscover = formatDiscover;
         this.pathFactory = pathFactory;
+        this.deletionVectorsEnabled = deletionVectorsEnabled;
         this.formatReaderMappings = new HashMap<>();
         this.fileIndexReadEnabled = fileIndexReadEnabled;
         this.rowTrackingEnabled = rowTrackingEnabled;
@@ -131,7 +134,9 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
 
     @Override
     public SplitRead<InternalRow> withTopN(@Nullable TopN topN) {
-        this.topN = topN;
+        if (!deletionVectorsEnabled) {
+            this.topN = topN;
+        }
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
index 8422ed8e78..4f867b5ac1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
@@ -23,6 +23,7 @@ import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.partition.PartitionUtils;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.SortValue;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.schema.IndexCastMapping;
 import org.apache.paimon.schema.SchemaEvolutionUtil;
@@ -41,6 +42,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Function;
 
 import static 
org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;
@@ -221,7 +223,25 @@ public class FormatReaderMapping {
                     dataSchema,
                     readFilters,
                     systemFields,
-                    topN);
+                    evolutionTopN(tableSchema, dataSchema));
+        }
+
+        @Nullable
+        private TopN evolutionTopN(TableSchema tableSchema, TableSchema 
dataSchema) {
+            TopN pushTopN = topN;
+            if (pushTopN != null) {
+                Map<String, DataField> tableFields = 
tableSchema.nameToFieldMap();
+                Map<Integer, DataField> dataFields = dataSchema.idToFieldMap();
+                for (SortValue value : pushTopN.orders()) {
+                    DataField tableField = 
tableFields.get(value.field().name());
+                    DataField dataField = dataFields.get(tableField.id());
+                    if (!Objects.equals(tableField, dataField)) {
+                        pushTopN = null;
+                        break;
+                    }
+                }
+            }
+            return pushTopN;
         }
 
         public FormatReaderMapping build(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index 3188f3223c..babb62a995 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -46,6 +46,7 @@ import org.apache.paimon.predicate.SortValue;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
@@ -91,6 +92,7 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.BUCKET_KEY;
 import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY;
+import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET;
 import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
@@ -831,25 +833,23 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
                         .field("event", DataTypes.STRING())
                         .field("price", DataTypes.INT())
                         .build();
+        Consumer<Options> configure =
+                options -> {
+                    options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
+                    options.set(WRITE_ONLY, true);
+                    options.set(
+                            FileIndexOptions.FILE_INDEX
+                                    + "."
+                                    + RangeBitmapFileIndexFactory.RANGE_BITMAP
+                                    + "."
+                                    + CoreOptions.COLUMNS,
+                            "price");
+                    options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576");
+                    
options.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
+                    options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, 
"300");
+                };
         // in unaware-bucket mode, we split files into splits all the time
-        FileStoreTable table =
-                createUnawareBucketFileStoreTable(
-                        rowType,
-                        options -> {
-                            options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
-                            options.set(WRITE_ONLY, true);
-                            options.set(
-                                    FileIndexOptions.FILE_INDEX
-                                            + "."
-                                            + 
RangeBitmapFileIndexFactory.RANGE_BITMAP
-                                            + "."
-                                            + CoreOptions.COLUMNS,
-                                    "price");
-                            options.set(ParquetOutputFormat.BLOCK_SIZE, 
"1048576");
-                            options.set(
-                                    
ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
-                            
options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300");
-                        });
+        FileStoreTable table = createUnawareBucketFileStoreTable(rowType, 
configure);
 
         int bound = 300000;
         int rowCount = 1000000;
@@ -918,6 +918,65 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
             assertThat(cnt.get()).isEqualTo(rowCount);
             reader.close();
         }
+
+        // test should not push topN with index and evolution
+        {
+            table.schemaManager()
+                    .commitChanges(SchemaChange.updateColumnType("price", 
DataTypes.BIGINT()));
+            rowType =
+                    RowType.builder()
+                            .field("id", DataTypes.STRING())
+                            .field("event", DataTypes.STRING())
+                            .field("price", DataTypes.BIGINT())
+                            .build();
+            table = createUnawareBucketFileStoreTable(rowType, configure);
+            DataField field = rowType.getField("price");
+            SortValue sort =
+                    new SortValue(
+                            new FieldRef(field.id(), field.name(), 
field.type()),
+                            SortValue.SortDirection.DESCENDING,
+                            SortValue.NullOrdering.NULLS_LAST);
+            TopN topN = new TopN(Collections.singletonList(sort), k);
+            TableScan.Plan plan = table.newScan().plan();
+            RecordReader<InternalRow> reader =
+                    table.newRead().withTopN(topN).createReader(plan.splits());
+            AtomicInteger cnt = new AtomicInteger(0);
+            reader.forEachRemaining(row -> cnt.incrementAndGet());
+            assertThat(cnt.get()).isEqualTo(rowCount);
+            reader.close();
+        }
+
+        // test should not push topN with dv modes
+        {
+            table.schemaManager()
+                    .commitChanges(SchemaChange.updateColumnType("price", 
DataTypes.INT()));
+            rowType =
+                    RowType.builder()
+                            .field("id", DataTypes.STRING())
+                            .field("event", DataTypes.STRING())
+                            .field("price", DataTypes.INT())
+                            .build();
+            Consumer<Options> newConfigure =
+                    options -> {
+                        configure.accept(options);
+                        options.set(DELETION_VECTORS_ENABLED, true);
+                    };
+            table = createUnawareBucketFileStoreTable(rowType, newConfigure);
+            DataField field = rowType.getField("price");
+            SortValue sort =
+                    new SortValue(
+                            new FieldRef(field.id(), field.name(), 
field.type()),
+                            SortValue.SortDirection.DESCENDING,
+                            SortValue.NullOrdering.NULLS_LAST);
+            TopN topN = new TopN(Collections.singletonList(sort), k);
+            TableScan.Plan plan = table.newScan().plan();
+            RecordReader<InternalRow> reader =
+                    table.newRead().withTopN(topN).createReader(plan.splits());
+            AtomicInteger cnt = new AtomicInteger(0);
+            reader.forEachRemaining(row -> cnt.incrementAndGet());
+            assertThat(cnt.get()).isEqualTo(rowCount);
+            reader.close();
+        }
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index f234ae980a..2e9695a525 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -147,6 +147,7 @@ public class TestChangelogDataReadWrite {
                         FileFormatDiscover.of(options),
                         pathFactory,
                         options.fileIndexReadEnabled(),
+                        false,
                         false);
         return new KeyValueTableRead(() -> read, () -> rawFileRead, null);
     }

Reply via email to