MehulBatra commented on code in PR #2525:
URL: https://github.com/apache/fluss/pull/2525#discussion_r2749715812


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java:
##########
@@ -932,6 +942,65 @@ void testGetChangelogVirtualTable() throws Exception {
         
assertThat(logChangelogTable.getUnresolvedSchema()).isEqualTo(expectedLogSchema);
     }
 
+    @Test
+    void testGetBinlogVirtualTable() throws Exception {
+        // Create a primary key table with partition
+        tEnv.executeSql(
+                "CREATE TABLE pk_table_for_binlog ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING NOT NULL,"
+                        + "  amount BIGINT,"
+                        + "  PRIMARY KEY (id, name) NOT ENFORCED"
+                        + ") PARTITIONED BY (name) "
+                        + "WITH ('bucket.num' = '1')");
+
+        // Get the $binlog virtual table via catalog API
+        CatalogTable binlogTable =
+                (CatalogTable)
+                        catalog.getTable(new ObjectPath(DEFAULT_DB, 
"pk_table_for_binlog$binlog"));
+
+        // Verify binlog schema has 5 columns: _change_type, _log_offset, 
_commit_timestamp,
+        // before, after
+        Schema binlogSchema = binlogTable.getUnresolvedSchema();
+        List<String> columnNames =
+                binlogSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+        assertThat(columnNames)
+                .containsExactly(
+                        "_change_type", "_log_offset", "_commit_timestamp", 
"before", "after");
+
+        // Verify before and after columns are ROW types containing the 
original columns
+        String schemaString = binlogSchema.toString();
+        assertThat(schemaString).contains("before");
+        assertThat(schemaString).contains("after");
+        assertThat(schemaString).contains("ROW<");
+        // Verify nested columns exist in the ROW type
+        assertThat(schemaString).contains("id INT NOT NULL");
+        assertThat(schemaString).contains("name STRING NOT NULL");
+        assertThat(schemaString).contains("amount BIGINT");

Review Comment:
   oh shit, I missed it again. I made sure to cover this in binglogUT, will fix 
this on Catalog UT aswell



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to