This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d121dd3a57 [core] fix stats evolution for data evolution tables (#7797)
d121dd3a57 is described below
commit d121dd3a57475a88caff56eb8651c2652de52831
Author: Faiz <[email protected]>
AuthorDate: Mon May 11 15:26:21 2026 +0800
[core] fix stats evolution for data evolution tables (#7797)
---
.../apache/paimon/stats/SimpleStatsEvolutions.java | 66 +++++++++++++--
.../org/apache/paimon/table/system/FilesTable.java | 3 +-
.../apache/paimon/table/system/FilesTableTest.java | 93 ++++++++++++++++++++++
3 files changed, 154 insertions(+), 8 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
index 682728f9a0..0c8af93713 100644
---
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
+++
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
@@ -27,11 +27,14 @@ import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
import static
org.apache.paimon.schema.SchemaEvolutionUtil.createIndexCastMapping;
@@ -44,7 +47,7 @@ public class SimpleStatsEvolutions {
private final long tableSchemaId;
private final List<DataField> tableDataFields;
private final AtomicReference<List<DataField>> tableFields;
- private final ConcurrentMap<Long, SimpleStatsEvolution> evolutions;
+ private final ConcurrentMap<EvolutionKey, SimpleStatsEvolution> evolutions;
public SimpleStatsEvolutions(Function<Long, List<DataField>> schemaFields,
long tableSchemaId) {
this.schemaFields = schemaFields;
@@ -55,20 +58,37 @@ public class SimpleStatsEvolutions {
}
public SimpleStatsEvolution getOrCreate(long dataSchemaId) {
+ return getOrCreate(dataSchemaId, null);
+ }
+
+ public SimpleStatsEvolution getOrCreate(long dataSchemaId, @Nullable
List<String> writeCols) {
+ EvolutionKey key = new EvolutionKey(dataSchemaId, writeCols);
return evolutions.computeIfAbsent(
- dataSchemaId,
- id -> {
- if (tableSchemaId == id) {
+ key,
+ k -> {
+ if (tableSchemaId == k.schemaId && k.writeCols == null) {
return new SimpleStatsEvolution(
- new RowType(schemaFields.apply(id)), null,
null);
+ new RowType(schemaFields.apply(k.schemaId)),
null, null);
}
// Get atomic schema fields.
List<DataField> schemaTableFields =
tableFields.updateAndGet(v -> v == null ?
tableDataFields : v);
- List<DataField> dataFields = schemaFields.apply(id);
+ List<DataField> dataFields =
schemaFields.apply(k.schemaId);
+
+ // Project data fields to write cols for data evolution
table
+ if (k.writeCols != null) {
+ RowType rowType = new RowType(dataFields);
+ // writeCols may contain some metadata fields i.e.
row_id & max_seq
+ dataFields =
+ rowType.project(
+ k.writeCols.stream()
+
.filter(rowType::containsField)
+
.collect(Collectors.toList()))
+ .getFields();
+ }
IndexCastMapping indexCastMapping =
- createIndexCastMapping(schemaTableFields,
schemaFields.apply(id));
+ createIndexCastMapping(schemaTableFields,
dataFields);
@Nullable int[] indexMapping =
indexCastMapping.getIndexMapping();
// Create col stats array serializer with schema evolution
return new SimpleStatsEvolution(
@@ -127,4 +147,36 @@ public class SimpleStatsEvolutions {
public List<DataField> tableDataFields() {
return tableDataFields;
}
+
+ /** Immutable key for StatsEvolution. */
+ private static class EvolutionKey {
+
+ private final long schemaId;
+ @Nullable private final List<String> writeCols;
+
+ private EvolutionKey(long schemaId, @Nullable List<String> writeCols) {
+ this.schemaId = schemaId;
+ this.writeCols =
+ writeCols == null
+ ? null
+ : Collections.unmodifiableList(new
ArrayList<>(writeCols));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ EvolutionKey that = (EvolutionKey) o;
+ return schemaId == that.schemaId && Objects.equals(writeCols,
that.writeCols);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schemaId, writeCols);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 0c7eadb7ae..56d3e0836b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -462,7 +462,8 @@ public class FilesTable implements ReadonlyTable {
}
private void initialize() {
- SimpleStatsEvolution evolution =
simpleStatsEvolutions.getOrCreate(file.schemaId());
+ SimpleStatsEvolution evolution =
+ simpleStatsEvolutions.getOrCreate(file.schemaId(),
file.writeCols());
// Create value stats
SimpleStatsEvolution.Result result =
evolution.evolution(file.valueStats(), file.rowCount(),
file.valueStatsCols());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
index 2a8818d498..7baf4ae617 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
@@ -35,12 +35,18 @@ import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
@@ -54,6 +60,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
import static org.apache.paimon.SnapshotTest.newSnapshotManager;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -223,6 +230,92 @@ public class FilesTableTest extends TableTestBase {
.satisfies(anyCauseMatches(IllegalArgumentException.class));
}
+ @Test
+ public void testReadStatsWithDataEvolutionWriteCols() throws Exception {
+ String tableName = "DataEvolutionFilesTable";
+ Identifier identifier = identifier(tableName);
+ Schema schema =
+ Schema.newBuilder()
+ .column("f0", DataTypes.INT())
+ .column("f1", DataTypes.STRING())
+ .option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true")
+ .option(CoreOptions.DATA_EVOLUTION_ENABLED.key(),
"true")
+ .build();
+ catalog.createTable(identifier, schema, true);
+
+ // Write a data-evolution table.
+ FileStoreTable dataEvolutionTable = getTable(identifier);
+ BatchWriteBuilder writeBuilder =
dataEvolutionTable.newBatchWriteBuilder();
+ try (BatchTableWrite write =
+ writeBuilder
+ .newWrite()
+ .withWriteType(
+
schema.rowType().project(Collections.singletonList("f1")));
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(BinaryString.fromString("a")));
+ commit.commit(write.prepareCommit());
+ }
+
+ catalog.alterTable(identifier, SchemaChange.addColumn("f2",
DataTypes.INT()), false);
+ dataEvolutionTable = getTable(identifier);
+ writeBuilder = dataEvolutionTable.newBatchWriteBuilder();
+ try (BatchTableWrite write =
+ writeBuilder
+ .newWrite()
+ .withWriteType(
+ dataEvolutionTable
+ .schema()
+ .logicalRowType()
+
.project(Collections.singletonList("f2")));
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(1));
+ List<CommitMessage> commitables = write.prepareCommit();
+ setFirstRowId(commitables, 0L);
+ commit.commit(commitables);
+ }
+
+ FilesTable dataEvolutionFilesTable =
+ (FilesTable)
+ catalog.getTable(
+ identifier(tableName + SYSTEM_TABLE_SPLITTER +
FilesTable.FILES));
+ List<InternalRow> result = read(dataEvolutionFilesTable);
+
+ // Each file only contain partial data columns.
+ assertThat(result).hasSize(2);
+ assertThat(
+ result.stream()
+ .map(row -> row.getString(10).toString())
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("{f0=1, f1=0, f2=1}", "{f0=1, f1=1,
f2=0}");
+ assertThat(
+ result.stream()
+ .map(row -> row.getString(11).toString())
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("{f0=null, f1=a, f2=null}",
"{f0=null, f1=null, f2=1}");
+ assertThat(
+ result.stream()
+ .map(row -> row.getString(12).toString())
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("{f0=null, f1=a, f2=null}",
"{f0=null, f1=null, f2=1}");
+ }
+
+ private void setFirstRowId(List<CommitMessage> commitables, long
firstRowId) {
+ commitables.forEach(
+ c -> {
+ CommitMessageImpl commitMessage = (CommitMessageImpl) c;
+ List<DataFileMeta> newFiles =
+ new
ArrayList<>(commitMessage.newFilesIncrement().newFiles());
+ commitMessage.newFilesIncrement().newFiles().clear();
+ commitMessage
+ .newFilesIncrement()
+ .newFiles()
+ .addAll(
+ newFiles.stream()
+ .map(s ->
s.assignFirstRowId(firstRowId))
+ .collect(Collectors.toList()));
+ });
+ }
+
private List<InternalRow> getExpectedResult(long snapshotId) {
if (!snapshotManager.snapshotExists(snapshotId)) {
return Collections.emptyList();