ZuebeyirEser commented on code in PR #2368:
URL: https://github.com/apache/fluss/pull/2368#discussion_r2690259726


##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java:
##########
@@ -845,6 +846,74 @@ void testUnionReadPrimaryKeyTableFailover(boolean 
isPartitioned) throws Exceptio
         jobClient.cancel().get();
     }
 
+    @Test
+    void testUnionReadWithAddColumn() throws Exception {
+        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"unionReadAddColumnPKTable");
+
+        // 1. Create PK Table (Lake Enabled)
+        Schema schema =
+                Schema.newBuilder()
+                        .column("c1", DataTypes.INT())
+                        .column("c2", DataTypes.STRING())
+                        .primaryKey("c1")
+                        .build();
+
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .distributedBy(1)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500))
+                        .build();
+
+        long tableId = createTable(tablePath, tableDescriptor);
+        TableBucket tableBucket = new TableBucket(tableId, 0);
+
+        // 2. Write initial data
+        List<InternalRow> initialRows = Arrays.asList(row(1, "v1"), row(2, 
"v2"));
+        writeRows(tablePath, initialRows, false);
+
+        // 3. Start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        try {
+            // 4. Wait for data to snapshot to Paimon
+            assertReplicaStatus(tableBucket, 2);
+
+            // 5. Add Column "c3"
+            List<TableChange> addColumnChanges =
+                    Collections.singletonList(
+                            TableChange.addColumn(
+                                    "c3",
+                                    DataTypes.INT(),
+                                    "new column",
+                                    TableChange.ColumnPosition.last()));
+            admin.alterTable(tablePath, addColumnChanges, false).get();
+
+            // 6. Write new data (Update Key 2, Insert Key 3)
+            // Updating key 2 validates that union read correctly merges
+            // the new schema data from log with old schema data from Paimon
+            List<InternalRow> newRows = Arrays.asList(row(2, "v2_updated", 
20), row(3, "v3", 30));
+            writeRows(tablePath, newRows, false);
+
+            // 7. Query via Flink SQL
+            CloseableIterator<Row> iterator =
+                    batchTEnv.executeSql("SELECT * FROM " + 
tablePath.getTableName()).collect();
+
+            // 8. Verify union read correctly handles schema evolution with PK 
updates:
+            // - Key 1: from Paimon snapshot (old schema, c3 should be null)
+            // - Key 2: from Fluss log (updated value, new schema)
+            // - Key 3: from Fluss log (new insert, new schema)
+            List<String> actualRows = collectRowsWithTimeout(iterator, 3, 
true);
+
+            assertThat(actualRows)
+                    .containsExactlyInAnyOrder(
+                            "+I[1, v1, null]", "+I[2, v2_updated, 20]", "+I[3, 
v3, 30]");

Review Comment:
   Done. I've expanded the test for multiple schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to