This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d121dd3a57 [core] fix stats evolution for data evolution tables (#7797)
d121dd3a57 is described below

commit d121dd3a57475a88caff56eb8651c2652de52831
Author: Faiz <[email protected]>
AuthorDate: Mon May 11 15:26:21 2026 +0800

    [core] fix stats evolution for data evolution tables (#7797)
---
 .../apache/paimon/stats/SimpleStatsEvolutions.java | 66 +++++++++++++--
 .../org/apache/paimon/table/system/FilesTable.java |  3 +-
 .../apache/paimon/table/system/FilesTableTest.java | 93 ++++++++++++++++++++++
 3 files changed, 154 insertions(+), 8 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java 
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
index 682728f9a0..0c8af93713 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
@@ -27,11 +27,14 @@ import org.apache.paimon.types.RowType;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.singletonList;
 import static 
org.apache.paimon.schema.SchemaEvolutionUtil.createIndexCastMapping;
@@ -44,7 +47,7 @@ public class SimpleStatsEvolutions {
     private final long tableSchemaId;
     private final List<DataField> tableDataFields;
     private final AtomicReference<List<DataField>> tableFields;
-    private final ConcurrentMap<Long, SimpleStatsEvolution> evolutions;
+    private final ConcurrentMap<EvolutionKey, SimpleStatsEvolution> evolutions;
 
     public SimpleStatsEvolutions(Function<Long, List<DataField>> schemaFields, 
long tableSchemaId) {
         this.schemaFields = schemaFields;
@@ -55,20 +58,37 @@ public class SimpleStatsEvolutions {
     }
 
     public SimpleStatsEvolution getOrCreate(long dataSchemaId) {
+        return getOrCreate(dataSchemaId, null);
+    }
+
+    public SimpleStatsEvolution getOrCreate(long dataSchemaId, @Nullable 
List<String> writeCols) {
+        EvolutionKey key = new EvolutionKey(dataSchemaId, writeCols);
         return evolutions.computeIfAbsent(
-                dataSchemaId,
-                id -> {
-                    if (tableSchemaId == id) {
+                key,
+                k -> {
+                    if (tableSchemaId == k.schemaId && k.writeCols == null) {
                         return new SimpleStatsEvolution(
-                                new RowType(schemaFields.apply(id)), null, 
null);
+                                new RowType(schemaFields.apply(k.schemaId)), 
null, null);
                     }
 
                     // Get atomic schema fields.
                     List<DataField> schemaTableFields =
                             tableFields.updateAndGet(v -> v == null ? 
tableDataFields : v);
-                    List<DataField> dataFields = schemaFields.apply(id);
+                    List<DataField> dataFields = 
schemaFields.apply(k.schemaId);
+
+                    // Project data fields to write cols for data evolution 
table
+                    if (k.writeCols != null) {
+                        RowType rowType = new RowType(dataFields);
+                        // writeCols may contain some metadata fields i.e. 
row_id & max_seq
+                        dataFields =
+                                rowType.project(
+                                                k.writeCols.stream()
+                                                        
.filter(rowType::containsField)
+                                                        
.collect(Collectors.toList()))
+                                        .getFields();
+                    }
                     IndexCastMapping indexCastMapping =
-                            createIndexCastMapping(schemaTableFields, 
schemaFields.apply(id));
+                            createIndexCastMapping(schemaTableFields, 
dataFields);
                     @Nullable int[] indexMapping = 
indexCastMapping.getIndexMapping();
                     // Create col stats array serializer with schema evolution
                     return new SimpleStatsEvolution(
@@ -127,4 +147,36 @@ public class SimpleStatsEvolutions {
     public List<DataField> tableDataFields() {
         return tableDataFields;
     }
+
+    /** Immutable key for StatsEvolution. */
+    private static class EvolutionKey {
+
+        private final long schemaId;
+        @Nullable private final List<String> writeCols;
+
+        private EvolutionKey(long schemaId, @Nullable List<String> writeCols) {
+            this.schemaId = schemaId;
+            this.writeCols =
+                    writeCols == null
+                            ? null
+                            : Collections.unmodifiableList(new 
ArrayList<>(writeCols));
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            EvolutionKey that = (EvolutionKey) o;
+            return schemaId == that.schemaId && Objects.equals(writeCols, 
that.writeCols);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(schemaId, writeCols);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 0c7eadb7ae..56d3e0836b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -462,7 +462,8 @@ public class FilesTable implements ReadonlyTable {
         }
 
         private void initialize() {
-            SimpleStatsEvolution evolution = 
simpleStatsEvolutions.getOrCreate(file.schemaId());
+            SimpleStatsEvolution evolution =
+                    simpleStatsEvolutions.getOrCreate(file.schemaId(), 
file.writeCols());
             // Create value stats
             SimpleStatsEvolution.Result result =
                     evolution.evolution(file.valueStats(), file.rowCount(), 
file.valueStatsCols());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
index 2a8818d498..7baf4ae617 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
@@ -35,12 +35,18 @@ import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
@@ -54,6 +60,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.SnapshotTest.newSnapshotManager;
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -223,6 +230,92 @@ public class FilesTableTest extends TableTestBase {
                 .satisfies(anyCauseMatches(IllegalArgumentException.class));
     }
 
+    @Test
+    public void testReadStatsWithDataEvolutionWriteCols() throws Exception {
+        String tableName = "DataEvolutionFilesTable";
+        Identifier identifier = identifier(tableName);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("f0", DataTypes.INT())
+                        .column("f1", DataTypes.STRING())
+                        .option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true")
+                        .option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), 
"true")
+                        .build();
+        catalog.createTable(identifier, schema, true);
+
+        // Write a data-evolution table.
+        FileStoreTable dataEvolutionTable = getTable(identifier);
+        BatchWriteBuilder writeBuilder = 
dataEvolutionTable.newBatchWriteBuilder();
+        try (BatchTableWrite write =
+                        writeBuilder
+                                .newWrite()
+                                .withWriteType(
+                                        
schema.rowType().project(Collections.singletonList("f1")));
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(BinaryString.fromString("a")));
+            commit.commit(write.prepareCommit());
+        }
+
+        catalog.alterTable(identifier, SchemaChange.addColumn("f2", 
DataTypes.INT()), false);
+        dataEvolutionTable = getTable(identifier);
+        writeBuilder = dataEvolutionTable.newBatchWriteBuilder();
+        try (BatchTableWrite write =
+                        writeBuilder
+                                .newWrite()
+                                .withWriteType(
+                                        dataEvolutionTable
+                                                .schema()
+                                                .logicalRowType()
+                                                
.project(Collections.singletonList("f2")));
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(1));
+            List<CommitMessage> commitables = write.prepareCommit();
+            setFirstRowId(commitables, 0L);
+            commit.commit(commitables);
+        }
+
+        FilesTable dataEvolutionFilesTable =
+                (FilesTable)
+                        catalog.getTable(
+                                identifier(tableName + SYSTEM_TABLE_SPLITTER + 
FilesTable.FILES));
+        List<InternalRow> result = read(dataEvolutionFilesTable);
+
+        // Each file only contain partial data columns.
+        assertThat(result).hasSize(2);
+        assertThat(
+                        result.stream()
+                                .map(row -> row.getString(10).toString())
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("{f0=1, f1=0, f2=1}", "{f0=1, f1=1, 
f2=0}");
+        assertThat(
+                        result.stream()
+                                .map(row -> row.getString(11).toString())
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("{f0=null, f1=a, f2=null}", 
"{f0=null, f1=null, f2=1}");
+        assertThat(
+                        result.stream()
+                                .map(row -> row.getString(12).toString())
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("{f0=null, f1=a, f2=null}", 
"{f0=null, f1=null, f2=1}");
+    }
+
+    private void setFirstRowId(List<CommitMessage> commitables, long 
firstRowId) {
+        commitables.forEach(
+                c -> {
+                    CommitMessageImpl commitMessage = (CommitMessageImpl) c;
+                    List<DataFileMeta> newFiles =
+                            new 
ArrayList<>(commitMessage.newFilesIncrement().newFiles());
+                    commitMessage.newFilesIncrement().newFiles().clear();
+                    commitMessage
+                            .newFilesIncrement()
+                            .newFiles()
+                            .addAll(
+                                    newFiles.stream()
+                                            .map(s -> 
s.assignFirstRowId(firstRowId))
+                                            .collect(Collectors.toList()));
+                });
+    }
+
     private List<InternalRow> getExpectedResult(long snapshotId) {
         if (!snapshotManager.snapshotExists(snapshotId)) {
             return Collections.emptyList();

Reply via email to