This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 7af1a1cf8 [flink] changelog read support for log table without 
pushdown optimizations (#2467)
7af1a1cf8 is described below

commit 7af1a1cf8609c257ff55bf45c63d8eb5cdac24d1
Author: MehulBatra <[email protected]>
AuthorDate: Sun Jan 25 13:54:43 2026 +0530

    [flink] changelog read support for log table without pushdown optimizations 
(#2467)
---
 .../apache/fluss/flink/catalog/FlinkCatalog.java   | 11 ++----
 .../fluss/flink/catalog/FlinkCatalogITCase.java    | 27 +++++++++------
 .../flink/source/ChangelogVirtualTableITCase.java  | 39 +++++++++++++++-------
 3 files changed, 47 insertions(+), 30 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index e39880add..c6c434889 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -901,14 +901,9 @@ public class FlinkCatalog extends AbstractCatalog {
             // Retrieve base table info
             TableInfo tableInfo = admin.getTableInfo(baseTablePath).get();
 
-            // Validate that this is a primary key table
-            if (tableInfo.getPhysicalPrimaryKeys().isEmpty()) {
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "Virtual $changelog tables are only supported 
for primary key tables. "
-                                        + "Table %s does not have a primary 
key.",
-                                baseTablePath));
-            }
+            // $changelog is supported for both PK tables and log tables:
+            // - PK tables: have change types +I, -U, +U, -D
+            // - Log tables: only have +A (append-only)
 
             // Convert to Flink table
             CatalogBaseTable catalogBaseTable = 
FlinkConversions.toFlinkTable(tableInfo);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index 5381b4672..d7ab5cfab 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -920,18 +920,25 @@ abstract class FlinkCatalogITCase {
         // Verify options are inherited from base table
         assertThat(changelogTable.getOptions()).containsEntry("bucket.num", 
"1");
 
-        // Verify $changelog on non-PK table throws appropriate error
+        // Verify $changelog log tables (append-only with +A change type)
         tEnv.executeSql("CREATE TABLE log_table_for_changelog (id INT, name 
STRING)");
 
-        assertThatThrownBy(
-                        () ->
-                                catalog.getTable(
-                                        new ObjectPath(
-                                                DEFAULT_DB, 
"log_table_for_changelog$changelog")))
-                .isInstanceOf(CatalogException.class)
-                .hasRootCauseMessage(
-                        "Virtual $changelog tables are only supported for 
primary key tables. "
-                                + "Table fluss.log_table_for_changelog does 
not have a primary key.");
+        CatalogTable logChangelogTable =
+                (CatalogTable)
+                        catalog.getTable(
+                                new ObjectPath(DEFAULT_DB, 
"log_table_for_changelog$changelog"));
+
+        // Log table changelog should have same metadata columns
+        Schema expectedLogSchema =
+                Schema.newBuilder()
+                        .column("_change_type", DataTypes.STRING().notNull())
+                        .column("_log_offset", DataTypes.BIGINT().notNull())
+                        .column("_commit_timestamp", 
DataTypes.TIMESTAMP_LTZ(3).notNull())
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .build();
+
+        
assertThat(logChangelogTable.getUnresolvedSchema()).isEqualTo(expectedLogSchema);
     }
 
     /**
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
index a1f3653ee..2b9111af8 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
@@ -54,7 +54,6 @@ import static 
org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
 import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Integration test for $changelog virtual table functionality. */
 abstract class ChangelogVirtualTableITCase extends AbstractTestBase {
@@ -198,23 +197,39 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
     }
 
     @Test
-    public void testChangelogVirtualTableWithNonPrimaryKeyTable() {
-        // Create a non-primary key table (log table)
+    public void testChangelogVirtualTableWithLogTable() throws Exception {
+        // Create a log table (no primary key) with 1 bucket for predictable 
offsets
         tEnv.executeSql(
                 "CREATE TABLE events ("
                         + "  event_id INT,"
-                        + "  event_type STRING,"
-                        + "  event_time TIMESTAMP"
-                        + ")");
+                        + "  event_type STRING"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "events");
 
-        // Attempt to query changelog virtual table should fail
+        // Query the changelog virtual table
         String query = "SELECT * FROM events$changelog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        // Insert data into log table - log tables only have APPEND_ONLY (+A) 
change type
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(conn, tablePath, Arrays.asList(row(1, "click"), row(2, 
"view")), true);
+
+        // Collect and validate - log table changelog should have +A change 
type
+        List<String> results = collectRowsWithTimeout(rowIter, 2, false);
+        assertThat(results).hasSize(2);
+
+        // Format: +I[_change_type, _log_offset, _commit_timestamp, event_id, 
event_type]
+        // Log tables use +A (append-only) change type
+        assertThat(results.get(0)).isEqualTo("+I[+A, 0, 1970-01-01T00:00:01Z, 
1, click]");
+        assertThat(results.get(1)).isEqualTo("+I[+A, 1, 1970-01-01T00:00:01Z, 
2, view]");
+
+        // Insert more data with new timestamp
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(conn, tablePath, Arrays.asList(row(3, "purchase")), true);
 
-        // The error message is wrapped in a CatalogException, so we check for 
the root cause
-        assertThatThrownBy(() -> tEnv.executeSql(query).await())
-                .hasRootCauseMessage(
-                        "Virtual $changelog tables are only supported for 
primary key tables. "
-                                + "Table test_changelog_db.events does not 
have a primary key.");
+        List<String> moreResults = collectRowsWithTimeout(rowIter, 1, true);
+        assertThat(moreResults.get(0)).isEqualTo("+I[+A, 2, 
1970-01-01T00:00:02Z, 3, purchase]");
     }
 
     @Test

Reply via email to