luoyuxia commented on code in PR #2368:
URL: https://github.com/apache/fluss/pull/2368#discussion_r2690167915
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java:
##########
@@ -845,6 +846,74 @@ void testUnionReadPrimaryKeyTableFailover(boolean
isPartitioned) throws Exceptio
jobClient.cancel().get();
}
+ @Test
+ void testUnionReadWithAddColumn() throws Exception {
+ TablePath tablePath = TablePath.of(DEFAULT_DB,
"unionReadAddColumnPKTable");
+
+ // 1. Create PK Table (Lake Enabled)
+ Schema schema =
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .primaryKey("c1")
+ .build();
+
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(schema)
+ .distributedBy(1)
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500))
+ .build();
+
+ long tableId = createTable(tablePath, tableDescriptor);
+ TableBucket tableBucket = new TableBucket(tableId, 0);
+
+ // 2. Write initial data
+ List<InternalRow> initialRows = Arrays.asList(row(1, "v1"), row(2,
"v2"));
+ writeRows(tablePath, initialRows, false);
+
+ // 3. Start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ try {
+ // 4. Wait for data to snapshot to Paimon
+ assertReplicaStatus(tableBucket, 2);
+
+ // 5. Add Column "c3"
+ List<TableChange> addColumnChanges =
+ Collections.singletonList(
+ TableChange.addColumn(
+ "c3",
+ DataTypes.INT(),
+ "new column",
+ TableChange.ColumnPosition.last()));
+ admin.alterTable(tablePath, addColumnChanges, false).get();
+
+ // 6. Write new data (Update Key 2, Insert Key 3)
+ // Updating key 2 validates that union read correctly merges
+ // the new schema data from log with old schema data from Paimon
+ List<InternalRow> newRows = Arrays.asList(row(2, "v2_updated",
20), row(3, "v3", 30));
+ writeRows(tablePath, newRows, false);
+
+ // 7. Query via Flink SQL
+ CloseableIterator<Row> iterator =
+ batchTEnv.executeSql("SELECT * FROM " +
tablePath.getTableName()).collect();
+
+ // 8. Verify union read correctly handles schema evolution with PK
updates:
+ // - Key 1: from Paimon snapshot (old schema, c3 should be null)
+ // - Key 2: from Fluss log (updated value, new schema)
+ // - Key 3: from Fluss log (new insert, new schema)
+ List<String> actualRows = collectRowsWithTimeout(iterator, 3,
true);
+
+ assertThat(actualRows)
+ .containsExactlyInAnyOrder(
+ "+I[1, v1, null]", "+I[2, v2_updated, 20]", "+I[3,
v3, 30]");
Review Comment:
Then, could you please continus to test the case that merge fluss change log
with different schema id should also work? We can do like this which I have
already verified.
```
// stop tiering to make sure we can merge change log with different schema
jobClient.cancel().get();
addColumnChanges =
Collections.singletonList(
TableChange.addColumn(
"c4",
DataTypes.INT(),
"new column",
TableChange.ColumnPosition.last()));
admin.alterTable(tablePath, addColumnChanges, false).get();
newRows =
Arrays.asList(row(2, "v2_updated_again", 20, 30), row(3,
"v3_update", 30, 40));
writeRows(tablePath, newRows, false);
iterator =
batchTEnv.executeSql("SELECT * FROM " +
tablePath.getTableName()).collect();
actualRows = collectRowsWithTimeout(iterator, 3, true);
assertThat(actualRows).containsExactlyInAnyOrder(
"+I[1, v1, null, null]",
"+I[2, v2_updated_again, 20, 30]",
"+I[3, v3_update, 30, 40]"
);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]