This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new d2ac42281 [paimon] Add IT case for union read with evolved schema for 
Paimon (#2368)
d2ac42281 is described below

commit d2ac42281f4d46bc0d275ea706fc3c4c85e09f03
Author: Zübeyir Eser <[email protected]>
AuthorDate: Thu Jan 15 04:59:21 2026 +0100

    [paimon] Add IT case for union read with evolved schema for Paimon (#2368)
---
 .../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 100 +++++++++++++++++++++
 1 file changed, 100 insertions(+)

diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index cb69684e3..41f18b8c6 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -23,6 +23,7 @@ import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.BinaryString;
@@ -845,6 +846,105 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         jobClient.cancel().get();
     }
 
+    @Test
+    void testUnionReadWithAddColumn() throws Exception {
+        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"unionReadAddColumnPKTable");
+
+        // 1. Create PK Table (Lake Enabled)
+        Schema schema =
+                Schema.newBuilder()
+                        .column("c1", DataTypes.INT())
+                        .column("c2", DataTypes.STRING())
+                        .primaryKey("c1")
+                        .build();
+
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .distributedBy(1)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500))
+                        .build();
+
+        long tableId = createTable(tablePath, tableDescriptor);
+        TableBucket tableBucket = new TableBucket(tableId, 0);
+
+        // 2. Write initial data
+        List<InternalRow> initialRows = Arrays.asList(row(1, "v1"), row(2, 
"v2"));
+        writeRows(tablePath, initialRows, false);
+
+        // 3. Start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        try {
+            // 4. Wait for data to snapshot to Paimon
+            assertReplicaStatus(tableBucket, 2);
+
+            // 5. Add Column "c3"
+            List<TableChange> addColumnChanges =
+                    Collections.singletonList(
+                            TableChange.addColumn(
+                                    "c3",
+                                    DataTypes.INT(),
+                                    "new column",
+                                    TableChange.ColumnPosition.last()));
+            admin.alterTable(tablePath, addColumnChanges, false).get();
+
+            // 6. Write new data (Update Key 2, Insert Key 3)
+            // Updating key 2 validates that union read correctly merges
+            // the new schema data from log with old schema data from Paimon
+            List<InternalRow> newRows = Arrays.asList(row(2, "v2_updated", 
20), row(3, "v3", 30));
+            writeRows(tablePath, newRows, false);
+
+            // 7. Query via Flink SQL
+            CloseableIterator<Row> iterator =
+                    batchTEnv.executeSql("SELECT * FROM " + 
tablePath.getTableName()).collect();
+
+            // 8. Verify union read correctly handles schema evolution with PK 
updates:
+            // - Key 1: from Paimon snapshot (old schema, c3 should be null)
+            // - Key 2: from Fluss log (updated value, new schema)
+            // - Key 3: from Fluss log (new insert, new schema)
+            List<String> actualRows = collectRowsWithTimeout(iterator, 3, 
true);
+
+            assertThat(actualRows)
+                    .containsExactlyInAnyOrder(
+                            "+I[1, v1, null]", "+I[2, v2_updated, 20]", "+I[3, 
v3, 30]");
+
+            // 9. Add Column "c4" (Schema V3)
+            // Verify union read reconciles tiered data (V1) with a fluss log
+            // containing multiple schema versions (V2 and V3).
+            jobClient.cancel().get();
+            addColumnChanges =
+                    Collections.singletonList(
+                            TableChange.addColumn(
+                                    "c4",
+                                    DataTypes.INT(),
+                                    "another new column",
+                                    TableChange.ColumnPosition.last()));
+            admin.alterTable(tablePath, addColumnChanges, false).get();
+
+            // 10. Write data for Schema V3 (Update Key 2 and Key 3)
+            newRows =
+                    Arrays.asList(row(2, "v2_updated_again", 20, 30), row(3, 
"v3_update", 30, 40));
+            writeRows(tablePath, newRows, false);
+
+            // 11. Final Query Verify (Paimon V1 + Log V2 + Log V3)
+            // - Key 1: from Paimon snapshot (oldest schema, c3/c4 should be 
null)
+            // - Key 2: from Fluss log (latest update, newest schema)
+            // - Key 3: from Fluss log (latest update, newest schema)
+            iterator = batchTEnv.executeSql("SELECT * FROM " + 
tablePath.getTableName()).collect();
+            actualRows = collectRowsWithTimeout(iterator, 3, true);
+
+            assertThat(actualRows)
+                    .containsExactlyInAnyOrder(
+                            "+I[1, v1, null, null]",
+                            "+I[2, v2_updated_again, 20, 30]",
+                            "+I[3, v3_update, 30, 40]");
+        } finally {
+            jobClient.cancel().get();
+        }
+    }
+
     @Test
     void testUnionReadPartitionsExistInPaimonButExpiredInFluss() throws 
Exception {
         // first of all, start tiering

Reply via email to