This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 7af1a1cf8 [flink] changelog read support for log table without
pushdown optimizations (#2467)
7af1a1cf8 is described below
commit 7af1a1cf8609c257ff55bf45c63d8eb5cdac24d1
Author: MehulBatra <[email protected]>
AuthorDate: Sun Jan 25 13:54:43 2026 +0530
[flink] changelog read support for log table without pushdown optimizations
(#2467)
---
.../apache/fluss/flink/catalog/FlinkCatalog.java | 11 ++----
.../fluss/flink/catalog/FlinkCatalogITCase.java | 27 +++++++++------
.../flink/source/ChangelogVirtualTableITCase.java | 39 +++++++++++++++-------
3 files changed, 47 insertions(+), 30 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index e39880add..c6c434889 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -901,14 +901,9 @@ public class FlinkCatalog extends AbstractCatalog {
// Retrieve base table info
TableInfo tableInfo = admin.getTableInfo(baseTablePath).get();
- // Validate that this is a primary key table
- if (tableInfo.getPhysicalPrimaryKeys().isEmpty()) {
- throw new UnsupportedOperationException(
- String.format(
- "Virtual $changelog tables are only supported
for primary key tables. "
- + "Table %s does not have a primary
key.",
- baseTablePath));
- }
+ // $changelog is supported for both PK tables and log tables:
+ // - PK tables: have change types +I, -U, +U, -D
+ // - Log tables: only have +A (append-only)
// Convert to Flink table
CatalogBaseTable catalogBaseTable =
FlinkConversions.toFlinkTable(tableInfo);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index 5381b4672..d7ab5cfab 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -920,18 +920,25 @@ abstract class FlinkCatalogITCase {
// Verify options are inherited from base table
assertThat(changelogTable.getOptions()).containsEntry("bucket.num",
"1");
- // Verify $changelog on non-PK table throws appropriate error
+ // Verify $changelog log tables (append-only with +A change type)
tEnv.executeSql("CREATE TABLE log_table_for_changelog (id INT, name
STRING)");
- assertThatThrownBy(
- () ->
- catalog.getTable(
- new ObjectPath(
- DEFAULT_DB,
"log_table_for_changelog$changelog")))
- .isInstanceOf(CatalogException.class)
- .hasRootCauseMessage(
- "Virtual $changelog tables are only supported for
primary key tables. "
- + "Table fluss.log_table_for_changelog does
not have a primary key.");
+ CatalogTable logChangelogTable =
+ (CatalogTable)
+ catalog.getTable(
+ new ObjectPath(DEFAULT_DB,
"log_table_for_changelog$changelog"));
+
+ // Log table changelog should have same metadata columns
+ Schema expectedLogSchema =
+ Schema.newBuilder()
+ .column("_change_type", DataTypes.STRING().notNull())
+ .column("_log_offset", DataTypes.BIGINT().notNull())
+ .column("_commit_timestamp",
DataTypes.TIMESTAMP_LTZ(3).notNull())
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .build();
+
+
assertThat(logChangelogTable.getUnresolvedSchema()).isEqualTo(expectedLogSchema);
}
/**
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
index a1f3653ee..2b9111af8 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
@@ -54,7 +54,6 @@ import static
org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
import static
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Integration test for $changelog virtual table functionality. */
abstract class ChangelogVirtualTableITCase extends AbstractTestBase {
@@ -198,23 +197,39 @@ abstract class ChangelogVirtualTableITCase extends
AbstractTestBase {
}
@Test
- public void testChangelogVirtualTableWithNonPrimaryKeyTable() {
- // Create a non-primary key table (log table)
+ public void testChangelogVirtualTableWithLogTable() throws Exception {
+ // Create a log table (no primary key) with 1 bucket for predictable
offsets
tEnv.executeSql(
"CREATE TABLE events ("
+ " event_id INT,"
- + " event_type STRING,"
- + " event_time TIMESTAMP"
- + ")");
+ + " event_type STRING"
+ + ") WITH ('bucket.num' = '1')");
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, "events");
- // Attempt to query changelog virtual table should fail
+ // Query the changelog virtual table
String query = "SELECT * FROM events$changelog";
+ CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+ // Insert data into log table - log tables only have APPEND_ONLY (+A)
change type
+ CLOCK.advanceTime(Duration.ofMillis(1000));
+ writeRows(conn, tablePath, Arrays.asList(row(1, "click"), row(2,
"view")), true);
+
+ // Collect and validate - log table changelog should have +A change
type
+ List<String> results = collectRowsWithTimeout(rowIter, 2, false);
+ assertThat(results).hasSize(2);
+
+ // Format: +I[_change_type, _log_offset, _commit_timestamp, event_id,
event_type]
+ // Log tables use +A (append-only) change type
+ assertThat(results.get(0)).isEqualTo("+I[+A, 0, 1970-01-01T00:00:01Z,
1, click]");
+ assertThat(results.get(1)).isEqualTo("+I[+A, 1, 1970-01-01T00:00:01Z,
2, view]");
+
+ // Insert more data with new timestamp
+ CLOCK.advanceTime(Duration.ofMillis(1000));
+ writeRows(conn, tablePath, Arrays.asList(row(3, "purchase")), true);
- // The error message is wrapped in a CatalogException, so we check for
the root cause
- assertThatThrownBy(() -> tEnv.executeSql(query).await())
- .hasRootCauseMessage(
- "Virtual $changelog tables are only supported for
primary key tables. "
- + "Table test_changelog_db.events does not
have a primary key.");
+ List<String> moreResults = collectRowsWithTimeout(rowIter, 1, true);
+ assertThat(moreResults.get(0)).isEqualTo("+I[+A, 2,
1970-01-01T00:00:02Z, 3, purchase]");
}
@Test