This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new d2ac42281 [paimon] Add IT case for union read with evolved schema for
Paimon (#2368)
d2ac42281 is described below
commit d2ac42281f4d46bc0d275ea706fc3c4c85e09f03
Author: Zübeyir Eser <[email protected]>
AuthorDate: Thu Jan 15 04:59:21 2026 +0100
[paimon] Add IT case for union read with evolved schema for Paimon (#2368)
---
.../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 100 +++++++++++++++++++++
1 file changed, 100 insertions(+)
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index cb69684e3..41f18b8c6 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -23,6 +23,7 @@ import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.BinaryString;
@@ -845,6 +846,105 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
jobClient.cancel().get();
}
+ @Test
+ void testUnionReadWithAddColumn() throws Exception {
+ TablePath tablePath = TablePath.of(DEFAULT_DB,
"unionReadAddColumnPKTable");
+
+ // 1. Create PK Table (Lake Enabled)
+ Schema schema =
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .primaryKey("c1")
+ .build();
+
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(schema)
+ .distributedBy(1)
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500))
+ .build();
+
+ long tableId = createTable(tablePath, tableDescriptor);
+ TableBucket tableBucket = new TableBucket(tableId, 0);
+
+ // 2. Write initial data
+ List<InternalRow> initialRows = Arrays.asList(row(1, "v1"), row(2,
"v2"));
+ writeRows(tablePath, initialRows, false);
+
+ // 3. Start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ try {
+ // 4. Wait for data to snapshot to Paimon
+ assertReplicaStatus(tableBucket, 2);
+
+ // 5. Add Column "c3"
+ List<TableChange> addColumnChanges =
+ Collections.singletonList(
+ TableChange.addColumn(
+ "c3",
+ DataTypes.INT(),
+ "new column",
+ TableChange.ColumnPosition.last()));
+ admin.alterTable(tablePath, addColumnChanges, false).get();
+
+ // 6. Write new data (Update Key 2, Insert Key 3)
+ // Updating key 2 validates that union read correctly merges
+ // the new schema data from log with old schema data from Paimon
+ List<InternalRow> newRows = Arrays.asList(row(2, "v2_updated",
20), row(3, "v3", 30));
+ writeRows(tablePath, newRows, false);
+
+ // 7. Query via Flink SQL
+ CloseableIterator<Row> iterator =
+ batchTEnv.executeSql("SELECT * FROM " +
tablePath.getTableName()).collect();
+
+ // 8. Verify union read correctly handles schema evolution with PK
updates:
+ // - Key 1: from Paimon snapshot (old schema, c3 should be null)
+ // - Key 2: from Fluss log (updated value, new schema)
+ // - Key 3: from Fluss log (new insert, new schema)
+ List<String> actualRows = collectRowsWithTimeout(iterator, 3,
true);
+
+ assertThat(actualRows)
+ .containsExactlyInAnyOrder(
+ "+I[1, v1, null]", "+I[2, v2_updated, 20]", "+I[3,
v3, 30]");
+
+ // 9. Add Column "c4" (Schema V3)
+ // Verify union read reconciles tiered data (V1) with a fluss log
+ // containing multiple schema versions (V2 and V3).
+ jobClient.cancel().get();
+ addColumnChanges =
+ Collections.singletonList(
+ TableChange.addColumn(
+ "c4",
+ DataTypes.INT(),
+ "another new column",
+ TableChange.ColumnPosition.last()));
+ admin.alterTable(tablePath, addColumnChanges, false).get();
+
+ // 10. Write data for Schema V3 (Update Key 2 and Key 3)
+ newRows =
+ Arrays.asList(row(2, "v2_updated_again", 20, 30), row(3,
"v3_update", 30, 40));
+ writeRows(tablePath, newRows, false);
+
+ // 11. Final Query Verify (Paimon V1 + Log V2 + Log V3)
+ // - Key 1: from Paimon snapshot (oldest schema, c3/c4 should be
null)
+ // - Key 2: from Fluss log (latest update, newest schema)
+ // - Key 3: from Fluss log (latest update, newest schema)
+ iterator = batchTEnv.executeSql("SELECT * FROM " +
tablePath.getTableName()).collect();
+ actualRows = collectRowsWithTimeout(iterator, 3, true);
+
+ assertThat(actualRows)
+ .containsExactlyInAnyOrder(
+ "+I[1, v1, null, null]",
+ "+I[2, v2_updated_again, 20, 30]",
+ "+I[3, v3_update, 30, 40]");
+ } finally {
+ jobClient.cancel().get();
+ }
+ }
+
@Test
void testUnionReadPartitionsExistInPaimonButExpiredInFluss() throws
Exception {
// first of all, start tiering