This is an automated email from the ASF dual-hosted git repository.

mehulbatra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 604bd2d02 [flink] changelog read support for pk table without pushdown 
optimizations (#2347)
604bd2d02 is described below

commit 604bd2d026265f795e6a1ed6985229dd135f21ef
Author: MehulBatra <[email protected]>
AuthorDate: Sat Jan 24 00:42:34 2026 +0530

    [flink] changelog read support for pk table without pushdown optimizations 
(#2347)
    
    * changelog support for pk table
    
    * address Jark comments
    
    * fix imports
    
    * fix ut
    
    * resolve conflicts
    
    * Jarks' comments.
    
    * fix ordering in IT
    
    ---------
    
    Co-authored-by: Mehul Batra <[email protected]>
    Co-authored-by: Jark Wu <[email protected]>
---
 .gitignore                                         |   3 +
 .../fluss/client/admin/FlussAdminITCase.java       |  27 ++
 .../org/apache/fluss/metadata/TableDescriptor.java |   5 +
 .../Flink118ChangelogVirtualTableITCase.java       |  21 ++
 .../Flink119ChangelogVirtualTableITCase.java       |  21 ++
 .../Flink120ChangelogVirtualTableITCase.java       |  21 ++
 .../source/Flink22ChangelogVirtualTableITCase.java |  21 ++
 .../apache/fluss/flink/catalog/FlinkCatalog.java   | 103 +++++
 .../fluss/flink/catalog/FlinkTableFactory.java     |  88 ++++-
 .../flink/source/ChangelogFlinkTableSource.java    | 220 +++++++++++
 .../ChangelogDeserializationSchema.java            |  75 ++++
 .../fluss/flink/utils/ChangelogRowConverter.java   | 120 ++++++
 .../flink/utils/RecordToFlinkRowConverter.java     |  34 ++
 .../fluss/flink/catalog/FlinkCatalogITCase.java    |  49 +++
 .../flink/source/ChangelogVirtualTableITCase.java  | 419 +++++++++++++++++++++
 .../flink/utils/ChangelogRowConverterTest.java     | 209 ++++++++++
 .../server/utils/TableDescriptorValidation.java    |   8 +-
 17 files changed, 1437 insertions(+), 7 deletions(-)

diff --git a/.gitignore b/.gitignore
index f0c5b601e..845fa7ff9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -22,6 +22,9 @@ dependency-reduced-pom.xml
 ### VS Code ###
 .vscode/
 
+### claude code ###
+.claude/
+
 ### Mac OS ###
 .DS_Store
 
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 64cd85e9f..5560e14ee 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -1495,6 +1495,33 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                                 + "because they are reserved system columns in 
Fluss. "
                                 + "Please use other names for these columns. "
                                 + "The reserved system columns are: __offset, 
__timestamp, __bucket");
+
+        // Test changelog virtual table metadata columns are also reserved
+        TableDescriptor changelogColumnsDescriptor =
+                TableDescriptor.builder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("f0", DataTypes.STRING())
+                                        .column("_change_type", 
DataTypes.STRING())
+                                        .column("_log_offset", 
DataTypes.BIGINT())
+                                        .column("_commit_timestamp", 
DataTypes.TIMESTAMP())
+                                        .build())
+                        .distributedBy(1)
+                        .build();
+
+        TablePath changelogTablePath = TablePath.of(dbName, 
"test_changelog_columns");
+
+        assertThatThrownBy(
+                        () ->
+                                admin.createTable(
+                                                changelogTablePath,
+                                                changelogColumnsDescriptor,
+                                                false)
+                                        .get())
+                .cause()
+                .isInstanceOf(InvalidTableException.class)
+                .hasMessageContaining(
+                        "_change_type, _log_offset, _commit_timestamp cannot 
be used as column names");
     }
 
     @Test
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java
index e3e4fc931..4c8ecbd79 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java
@@ -60,6 +60,11 @@ public final class TableDescriptor implements Serializable {
     public static final String TIMESTAMP_COLUMN_NAME = "__timestamp";
     public static final String BUCKET_COLUMN_NAME = "__bucket";
 
+    // Reserved column names for virtual table metadata ($changelog and 
$binlog)
+    public static final String CHANGE_TYPE_COLUMN = "_change_type";
+    public static final String LOG_OFFSET_COLUMN = "_log_offset";
+    public static final String COMMIT_TIMESTAMP_COLUMN = "_commit_timestamp";
+
     private final Schema schema;
     private final @Nullable String comment;
     private final List<String> partitionKeys;
diff --git 
a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118ChangelogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118ChangelogVirtualTableITCase.java
new file mode 100644
index 000000000..10aa76d70
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118ChangelogVirtualTableITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+/** IT case for {@link ChangelogVirtualTableITCase} in Flink 1.18. */
+public class Flink118ChangelogVirtualTableITCase extends 
ChangelogVirtualTableITCase {}
diff --git 
a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119ChangelogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119ChangelogVirtualTableITCase.java
new file mode 100644
index 000000000..265fb80a2
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119ChangelogVirtualTableITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+/** IT case for {@link ChangelogVirtualTableITCase} in Flink 1.19. */
+public class Flink119ChangelogVirtualTableITCase extends 
ChangelogVirtualTableITCase {}
diff --git 
a/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120ChangelogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120ChangelogVirtualTableITCase.java
new file mode 100644
index 000000000..b0e4a1151
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120ChangelogVirtualTableITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+/** IT case for {@link ChangelogVirtualTableITCase} in Flink 1.20. */
+public class Flink120ChangelogVirtualTableITCase extends 
ChangelogVirtualTableITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22ChangelogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22ChangelogVirtualTableITCase.java
new file mode 100644
index 000000000..8a193c0bd
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22ChangelogVirtualTableITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+/** IT case for {@link ChangelogVirtualTableITCase} in Flink 2.2. */
+public class Flink22ChangelogVirtualTableITCase extends 
ChangelogVirtualTableITCase {}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 9d0151133..e39880add 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -85,6 +85,9 @@ import java.util.Optional;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
 import static 
org.apache.fluss.flink.FlinkConnectorOptions.ALTER_DISALLOW_OPTIONS;
@@ -112,6 +115,8 @@ import static 
org.apache.fluss.flink.utils.FlinkConversions.toFlussDatabase;
 public class FlinkCatalog extends AbstractCatalog {
 
     public static final String LAKE_TABLE_SPLITTER = "$lake";
+    public static final String CHANGELOG_TABLE_SUFFIX = "$changelog";
+    public static final String BINLOG_TABLE_SUFFIX = "$binlog";
 
     protected final ClassLoader classLoader;
 
@@ -303,6 +308,20 @@ public class FlinkCatalog extends AbstractCatalog {
             throws TableNotExistException, CatalogException {
         // may be should be as a datalake table
         String tableName = objectPath.getObjectName();
+
+        // Check if this is a virtual table ($changelog or $binlog)
+        if (tableName.endsWith(CHANGELOG_TABLE_SUFFIX)
+                && !tableName.contains(LAKE_TABLE_SPLITTER)) {
+            return getVirtualChangelogTable(objectPath);
+        } else if (tableName.endsWith(BINLOG_TABLE_SUFFIX)
+                && !tableName.contains(LAKE_TABLE_SPLITTER)) {
+            // TODO: Implement binlog virtual table in future
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "$binlog virtual tables are not yet supported for 
table %s",
+                            objectPath));
+        }
+
         TablePath tablePath = toTablePath(objectPath);
         try {
             TableInfo tableInfo;
@@ -862,4 +881,88 @@ public class FlinkCatalog extends AbstractCatalog {
         }
         return true;
     }
+
+    /**
+     * Creates a virtual $changelog table by modifying the base table's to 
include metadata columns.
+     */
+    private CatalogBaseTable getVirtualChangelogTable(ObjectPath objectPath)
+            throws TableNotExistException, CatalogException {
+        // Extract the base table name (remove $changelog suffix)
+        String virtualTableName = objectPath.getObjectName();
+        String baseTableName =
+                virtualTableName.substring(
+                        0, virtualTableName.length() - 
CHANGELOG_TABLE_SUFFIX.length());
+
+        // Get the base table
+        ObjectPath baseObjectPath = new 
ObjectPath(objectPath.getDatabaseName(), baseTableName);
+        TablePath baseTablePath = toTablePath(baseObjectPath);
+
+        try {
+            // Retrieve base table info
+            TableInfo tableInfo = admin.getTableInfo(baseTablePath).get();
+
+            // Validate that this is a primary key table
+            if (tableInfo.getPhysicalPrimaryKeys().isEmpty()) {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Virtual $changelog tables are only supported 
for primary key tables. "
+                                        + "Table %s does not have a primary 
key.",
+                                baseTablePath));
+            }
+
+            // Convert to Flink table
+            CatalogBaseTable catalogBaseTable = 
FlinkConversions.toFlinkTable(tableInfo);
+
+            if (!(catalogBaseTable instanceof CatalogTable)) {
+                throw new UnsupportedOperationException(
+                        "Virtual $changelog tables are only supported for 
regular tables");
+            }
+
+            CatalogTable baseTable = (CatalogTable) catalogBaseTable;
+
+            // Build the changelog schema by adding metadata columns
+            Schema originalSchema = baseTable.getUnresolvedSchema();
+            Schema changelogSchema = buildChangelogSchema(originalSchema);
+
+            // Copy options from base table
+            Map<String, String> newOptions = new 
HashMap<>(baseTable.getOptions());
+            newOptions.put(BOOTSTRAP_SERVERS.key(), bootstrapServers);
+            newOptions.putAll(securityConfigs);
+
+            // Create a new CatalogTable with the modified schema
+            return CatalogTableAdapter.toCatalogTable(
+                    changelogSchema,
+                    baseTable.getComment(),
+                    baseTable.getPartitionKeys(),
+                    newOptions);
+
+        } catch (Exception e) {
+            Throwable t = ExceptionUtils.stripExecutionException(e);
+            if (isTableNotExist(t)) {
+                throw new TableNotExistException(getName(), baseObjectPath);
+            } else {
+                throw new CatalogException(
+                        String.format(
+                                "Failed to get virtual changelog table %s in 
%s",
+                                objectPath, getName()),
+                        t);
+            }
+        }
+    }
+
+    private Schema buildChangelogSchema(Schema originalSchema) {
+        Schema.Builder builder = Schema.newBuilder();
+
+        // Add metadata columns first
+        builder.column("_change_type", STRING().notNull());
+        builder.column("_log_offset", BIGINT().notNull());
+        builder.column("_commit_timestamp", TIMESTAMP_LTZ(3).notNull());
+
+        // Add all original columns (preserves all column attributes including 
comments)
+        builder.fromColumns(originalSchema.getColumns());
+
+        // Note: We don't copy primary keys or watermarks for virtual tables
+
+        return builder.build();
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index 48d74e712..5a5c0278e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -24,6 +24,7 @@ import org.apache.fluss.flink.lake.LakeFlinkCatalog;
 import org.apache.fluss.flink.lake.LakeTableFactory;
 import org.apache.fluss.flink.sink.FlinkTableSink;
 import org.apache.fluss.flink.sink.shuffle.DistributionMode;
+import org.apache.fluss.flink.source.ChangelogFlinkTableSource;
 import org.apache.fluss.flink.source.FlinkTableSource;
 import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
 import org.apache.fluss.metadata.DataLakeFormat;
@@ -90,20 +91,20 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
             return lakeTableFactory.createDynamicTableSource(context, 
lakeTableName);
         }
 
+        // Check if this is a $changelog suffix in table name
+        if (tableName.endsWith(FlinkCatalog.CHANGELOG_TABLE_SUFFIX)) {
+            return createChangelogTableSource(context, tableIdentifier, 
tableName);
+        }
+
         FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         final ReadableConfig tableOptions = helper.getOptions();
-        Optional<DataLakeFormat> datalakeFormat = 
getDatalakeFormat(tableOptions);
-        List<String> prefixesToSkip =
-                new ArrayList<>(Arrays.asList("table.", "client.", "fields."));
-        datalakeFormat.ifPresent(dataLakeFormat -> 
prefixesToSkip.add(dataLakeFormat + "."));
-        helper.validateExcept(prefixesToSkip.toArray(new String[0]));
+        validateSourceOptions(helper, tableOptions);
 
         boolean isStreamingMode =
                 context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
                         == RuntimeExecutionMode.STREAMING;
 
         RowType tableOutputType = (RowType) 
context.getPhysicalRowDataType().getLogicalType();
-        FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions);
 
         ZoneId timeZone =
                 FlinkConnectorOptionsUtils.getLocalTimeZone(
@@ -267,4 +268,79 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
         }
         return lakeTableFactory;
     }
+
+    /**
+     * Validates table source options using the standard validation pattern.
+     *
+     * @param helper the factory helper for option validation
+     * @param tableOptions the table options to validate
+     */
+    private static void validateSourceOptions(
+            FactoryUtil.TableFactoryHelper helper, ReadableConfig 
tableOptions) {
+        Optional<DataLakeFormat> datalakeFormat = 
getDatalakeFormat(tableOptions);
+        List<String> prefixesToSkip =
+                new ArrayList<>(Arrays.asList("table.", "client.", "fields."));
+        datalakeFormat.ifPresent(dataLakeFormat -> 
prefixesToSkip.add(dataLakeFormat + "."));
+        helper.validateExcept(prefixesToSkip.toArray(new String[0]));
+        FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions);
+    }
+
+    /** Creates a ChangelogFlinkTableSource for $changelog virtual tables. */
+    private DynamicTableSource createChangelogTableSource(
+            Context context, ObjectIdentifier tableIdentifier, String 
tableName) {
+        // Extract the base table name by removing the $changelog suffix
+        String baseTableName =
+                tableName.substring(
+                        0, tableName.length() - 
FlinkCatalog.CHANGELOG_TABLE_SUFFIX.length());
+
+        boolean isStreamingMode =
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING;
+
+        // tableOutputType includes metadata columns: [_change_type, 
_log_offset, _commit_timestamp,
+        // data_cols...]
+        RowType tableOutputType = (RowType) 
context.getPhysicalRowDataType().getLogicalType();
+
+        // Extract data columns type (skip the 3 metadata columns) for index 
calculations
+        int numMetadataColumns = 3;
+        List<RowType.RowField> dataFields =
+                tableOutputType
+                        .getFields()
+                        .subList(numMetadataColumns, 
tableOutputType.getFieldCount());
+        RowType dataColumnsType = new RowType(new ArrayList<>(dataFields));
+
+        Map<String, String> catalogTableOptions = 
context.getCatalogTable().getOptions();
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        final ReadableConfig tableOptions = helper.getOptions();
+        validateSourceOptions(helper, tableOptions);
+
+        ZoneId timeZone =
+                FlinkConnectorOptionsUtils.getLocalTimeZone(
+                        
context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE));
+        final FlinkConnectorOptionsUtils.StartupOptions startupOptions =
+                FlinkConnectorOptionsUtils.getStartupOptions(tableOptions, 
timeZone);
+
+        ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();
+
+        // Partition key indexes based on data columns
+        int[] partitionKeyIndexes =
+                resolvedCatalogTable.getPartitionKeys().stream()
+                        .mapToInt(dataColumnsType::getFieldIndex)
+                        .toArray();
+
+        long partitionDiscoveryIntervalMs =
+                tableOptions
+                        
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
+                        .toMillis();
+
+        return new ChangelogFlinkTableSource(
+                TablePath.of(tableIdentifier.getDatabaseName(), baseTableName),
+                toFlussClientConfig(catalogTableOptions, 
context.getConfiguration()),
+                tableOutputType,
+                partitionKeyIndexes,
+                isStreamingMode,
+                startupOptions,
+                partitionDiscoveryIntervalMs,
+                catalogTableOptions);
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
new file mode 100644
index 000000000..3883c4308
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.client.initializer.OffsetsInitializer;
+import org.apache.fluss.config.Configuration;
+import 
org.apache.fluss.flink.source.deserializer.ChangelogDeserializationSchema;
+import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
+import org.apache.fluss.flink.utils.FlinkConversions;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** A Flink table source for the $changelog virtual table. */
+public class ChangelogFlinkTableSource implements ScanTableSource {
+
+    private final TablePath tablePath;
+    private final Configuration flussConfig;
+    // The changelog output type (includes metadata columns: _change_type, 
_log_offset,
+    // _commit_timestamp)
+    private final org.apache.flink.table.types.logical.RowType 
changelogOutputType;
+    private final org.apache.flink.table.types.logical.RowType dataColumnsType;
+    private final int[] partitionKeyIndexes;
+    private final boolean streaming;
+    private final FlinkConnectorOptionsUtils.StartupOptions startupOptions;
+    private final long scanPartitionDiscoveryIntervalMs;
+    private final Map<String, String> tableOptions;
+
+    // Projection pushdown
+    @Nullable private int[] projectedFields;
+    private LogicalType producedDataType;
+
+    @Nullable private Predicate partitionFilters;
+
+    private static final Set<String> METADATA_COLUMN_NAMES =
+            new HashSet<>(
+                    Arrays.asList(
+                            TableDescriptor.CHANGE_TYPE_COLUMN,
+                            TableDescriptor.LOG_OFFSET_COLUMN,
+                            TableDescriptor.COMMIT_TIMESTAMP_COLUMN));
+
+    public ChangelogFlinkTableSource(
+            TablePath tablePath,
+            Configuration flussConfig,
+            org.apache.flink.table.types.logical.RowType changelogOutputType,
+            int[] partitionKeyIndexes,
+            boolean streaming,
+            FlinkConnectorOptionsUtils.StartupOptions startupOptions,
+            long scanPartitionDiscoveryIntervalMs,
+            Map<String, String> tableOptions) {
+        this.tablePath = tablePath;
+        this.flussConfig = flussConfig;
+        // The changelogOutputType already includes metadata columns from 
FlinkCatalog
+        this.changelogOutputType = changelogOutputType;
+        this.partitionKeyIndexes = partitionKeyIndexes;
+        this.streaming = streaming;
+        this.startupOptions = startupOptions;
+        this.scanPartitionDiscoveryIntervalMs = 
scanPartitionDiscoveryIntervalMs;
+        this.tableOptions = tableOptions;
+
+        // Extract data columns by filtering out metadata columns by name
+        this.dataColumnsType = extractDataColumnsType(changelogOutputType);
+        this.producedDataType = changelogOutputType;
+    }
+
+    /**
+     * Extracts the data columns type by removing the metadata columns from 
the changelog output
+     * type.
+     */
+    private org.apache.flink.table.types.logical.RowType 
extractDataColumnsType(
+            org.apache.flink.table.types.logical.RowType changelogType) {
+        // Filter out metadata columns by name
+        List<org.apache.flink.table.types.logical.RowType.RowField> dataFields 
=
+                changelogType.getFields().stream()
+                        .filter(field -> 
!METADATA_COLUMN_NAMES.contains(field.getName()))
+                        .collect(Collectors.toList());
+
+        return new org.apache.flink.table.types.logical.RowType(dataFields);
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        // The $changelog virtual table always produces INSERT-only records.
+        // All change types (+I, -U, +U, -D) are flattened into regular rows
+        return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
+        // Create the Fluss row type for the data columns (without metadata)
+        RowType flussRowType = 
FlinkConversions.toFlussRowType(dataColumnsType);
+        if (projectedFields != null) {
+            // Adjust projection to account for metadata columns
+            // TODO: Handle projection properly with metadata columns
+            flussRowType = flussRowType.project(projectedFields);
+        }
+        // to capture all change types (+I, -U, +U, -D).
+        // FULL mode reads snapshot first (no change types), so we use 
EARLIEST for log-only
+        // reading.
+        // LATEST mode is supported for real-time changelog streaming from 
current position.
+        OffsetsInitializer offsetsInitializer;
+        switch (startupOptions.startupMode) {
+            case EARLIEST:
+            case FULL:
+                // For changelog, FULL mode should read all log records from 
beginning
+                offsetsInitializer = OffsetsInitializer.earliest();
+                break;
+            case LATEST:
+                offsetsInitializer = OffsetsInitializer.latest();
+                break;
+            case TIMESTAMP:
+                offsetsInitializer =
+                        
OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported startup mode: " + 
startupOptions.startupMode);
+        }
+
+        // Create the source with the changelog deserialization schema
+        FlinkSource<RowData> source =
+                new FlinkSource<>(
+                        flussConfig,
+                        tablePath,
+                        // Changelog/binlog virtual tables are purely 
log-based and don't have a
+                        // primary key, setting hasPrimaryKey "false" to 
ensure the enumerator
+                        // fetches log-only splits (not snapshot splits), 
which is the correct
+                        // behavior for virtual tables.
+                        false,
+                        isPartitioned(),
+                        flussRowType,
+                        projectedFields,
+                        offsetsInitializer,
+                        scanPartitionDiscoveryIntervalMs,
+                        new ChangelogDeserializationSchema(),
+                        streaming,
+                        partitionFilters,
+                        null); // Lake source not supported
+
+        if (!streaming) {
+            // Batch mode - changelog virtual tables read from log, not data 
lake
+            return new SourceProvider() {
+                @Override
+                public boolean isBounded() {
+                    return true;
+                }
+
+                @Override
+                public Source<RowData, ?, ?> createSource() {
+                    return source;
+                }
+            };
+        } else {
+            return SourceProvider.of(source);
+        }
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        ChangelogFlinkTableSource copy =
+                new ChangelogFlinkTableSource(
+                        tablePath,
+                        flussConfig,
+                        changelogOutputType,
+                        partitionKeyIndexes,
+                        streaming,
+                        startupOptions,
+                        scanPartitionDiscoveryIntervalMs,
+                        tableOptions);
+        copy.producedDataType = producedDataType;
+        copy.projectedFields = projectedFields;
+        copy.partitionFilters = partitionFilters;
+        return copy;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "FlussChangelogTableSource";
+    }
+
+    // TODO: Implement projection pushdown handling for metadata columns
+    // TODO: Implement filter pushdown
+
+    private boolean isPartitioned() {
+        return partitionKeyIndexes.length > 0;
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/ChangelogDeserializationSchema.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/ChangelogDeserializationSchema.java
new file mode 100644
index 000000000..c81141d34
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/ChangelogDeserializationSchema.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.deserializer;
+
+import org.apache.fluss.flink.utils.ChangelogRowConverter;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowType;
+
+/**
+ * A deserialization schema that converts {@link LogRecord} objects to Flink's 
{@link RowData}
+ * format with additional changelog metadata columns.
+ */
+public class ChangelogDeserializationSchema implements 
FlussDeserializationSchema<RowData> {
+
+    /**
+     * Converter responsible for transforming Fluss row data into Flink's 
{@link RowData} format
+     * with metadata columns. Initialized during {@link 
#open(InitializationContext)}.
+     */
+    private transient ChangelogRowConverter converter;
+
+    /** Creates a new ChangelogDeserializationSchema. */
+    public ChangelogDeserializationSchema() {}
+
+    /** Initializes the deserialization schema. */
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        if (converter == null) {
+            this.converter = new ChangelogRowConverter(context.getRowSchema());
+        }
+    }
+
+    /**
+     * Deserializes a {@link LogRecord} into a Flink {@link RowData} object 
with metadata columns.
+     */
+    @Override
+    public RowData deserialize(LogRecord record) throws Exception {
+        if (converter == null) {
+            throw new IllegalStateException(
+                    "Converter not initialized. The open() method must be 
called before deserializing records.");
+        }
+        return converter.toChangelogRowData(record);
+    }
+
+    /**
+     * Returns the TypeInformation for the produced {@link RowData} type 
including metadata columns.
+     */
+    @Override
+    public TypeInformation<RowData> getProducedType(RowType rowSchema) {
+        // Build the output type with metadata columns
+        org.apache.flink.table.types.logical.RowType outputType =
+                
ChangelogRowConverter.buildChangelogRowType(toFlinkRowType(rowSchema));
+        return InternalTypeInfo.of(outputType);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java
new file mode 100644
index 000000000..6b8081a6d
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A converter that transforms Fluss's {@link LogRecord} to Flink's {@link 
RowData} with additional
+ * metadata columns for the $changelog virtual table.
+ */
+public class ChangelogRowConverter implements RecordToFlinkRowConverter {
+
+    private final FlussRowToFlinkRowConverter baseConverter;
+    private final org.apache.flink.table.types.logical.RowType producedType;
+
+    /** Creates a new ChangelogRowConverter. */
+    public ChangelogRowConverter(RowType rowType) {
+        this.baseConverter = new FlussRowToFlinkRowConverter(rowType);
+        this.producedType = 
buildChangelogRowType(FlinkConversions.toFlinkRowType(rowType));
+    }
+
+    /** Converts a LogRecord to a Flink RowData with metadata columns. */
+    public RowData toChangelogRowData(LogRecord record) {
+        RowData physicalRowData = 
baseConverter.toFlinkRowData(record.getRow());
+
+        // Create metadata row with 3 fields
+        GenericRowData metadataRow = new GenericRowData(3);
+
+        // 1. _change_type
+        String changeTypeStr = 
convertChangeTypeToString(record.getChangeType());
+        metadataRow.setField(0, StringData.fromString(changeTypeStr));
+
+        // 2. _log_offset
+        metadataRow.setField(1, record.logOffset());
+
+        // 3. _commit_timestamp (convert long to TimestampData)
+        metadataRow.setField(2, 
TimestampData.fromEpochMillis(record.timestamp()));
+
+        // Use JoinedRowData to efficiently combine metadata and physical rows
+        JoinedRowData joinedRow = new JoinedRowData(metadataRow, 
physicalRowData);
+        joinedRow.setRowKind(RowKind.INSERT);
+
+        return joinedRow;
+    }
+
+    @Override
+    public RowData convert(LogRecord record) {
+        return toChangelogRowData(record);
+    }
+
+    @Override
+    public org.apache.flink.table.types.logical.RowType getProducedType() {
+        return producedType;
+    }
+
+    /** Converts a Fluss ChangeType to its string representation for the 
changelog virtual table. */
+    private String convertChangeTypeToString(ChangeType changeType) {
+        // Use the short string representation from ChangeType
+        return changeType.shortString();
+    }
+
+    /**
+     * Builds the Flink RowType for the changelog virtual table by adding 
metadata columns.
+     *
+     * @param originalType The original table's row type
+     * @return The row type with metadata columns prepended
+     */
+    public static org.apache.flink.table.types.logical.RowType 
buildChangelogRowType(
+            org.apache.flink.table.types.logical.RowType originalType) {
+        List<org.apache.flink.table.types.logical.RowType.RowField> fields = 
new ArrayList<>();
+
+        // Add metadata columns first (using centralized constants from 
TableDescriptor)
+        fields.add(
+                new org.apache.flink.table.types.logical.RowType.RowField(
+                        TableDescriptor.CHANGE_TYPE_COLUMN, new 
VarCharType(false, 2)));
+        fields.add(
+                new org.apache.flink.table.types.logical.RowType.RowField(
+                        TableDescriptor.LOG_OFFSET_COLUMN, new 
BigIntType(false)));
+        fields.add(
+                new org.apache.flink.table.types.logical.RowType.RowField(
+                        TableDescriptor.COMMIT_TIMESTAMP_COLUMN,
+                        new LocalZonedTimestampType(false, 3)));
+
+        // Add all original fields
+        fields.addAll(originalType.getFields());
+
+        return new org.apache.flink.table.types.logical.RowType(fields);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/RecordToFlinkRowConverter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/RecordToFlinkRowConverter.java
new file mode 100644
index 000000000..ec561a5a5
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/RecordToFlinkRowConverter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.record.LogRecord;
+
+import org.apache.flink.table.data.RowData;
+
+import java.io.Serializable;
+
+/** Interface for converting Fluss {@link LogRecord} to Flink {@link RowData}. 
*/
+public interface RecordToFlinkRowConverter extends Serializable {
+
+    /** Converts a LogRecord to a Flink RowData. */
+    RowData convert(LogRecord record);
+
+    /** Gets the output Flink row type produced by this converter. */
+    org.apache.flink.table.types.logical.RowType getProducedType();
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index f42ba30ae..5381b4672 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -885,6 +885,55 @@ abstract class FlinkCatalogITCase {
                 .containsEntry("table.datalake.paimon.jdbc.password", "pass");
     }
 
+    @Test
+    void testGetChangelogVirtualTable() throws Exception {
+        // Create a primary key table
+        tEnv.executeSql(
+                "CREATE TABLE pk_table_for_changelog ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  amount BIGINT,"
+                        + "  PRIMARY KEY (id, name) NOT ENFORCED"
+                        + ") PARTITIONED BY (name) "
+                        + "WITH ('bucket.num' = '1')");
+
+        // Get the $changelog virtual table via catalog API
+        CatalogTable changelogTable =
+                (CatalogTable)
+                        catalog.getTable(
+                                new ObjectPath(DEFAULT_DB, 
"pk_table_for_changelog$changelog"));
+
+        // Build expected schema: metadata columns + original columns
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("_change_type", DataTypes.STRING().notNull())
+                        .column("_log_offset", DataTypes.BIGINT().notNull())
+                        .column("_commit_timestamp", 
DataTypes.TIMESTAMP_LTZ(3).notNull())
+                        .column("id", DataTypes.INT().notNull())
+                        .column("name", DataTypes.STRING().notNull())
+                        .column("amount", DataTypes.BIGINT())
+                        .build();
+
+        
assertThat(changelogTable.getUnresolvedSchema()).isEqualTo(expectedSchema);
+        
assertThat(changelogTable.getPartitionKeys()).isEqualTo(Collections.singletonList("name"));
+
+        // Verify options are inherited from base table
+        assertThat(changelogTable.getOptions()).containsEntry("bucket.num", 
"1");
+
+        // Verify $changelog on non-PK table throws appropriate error
+        tEnv.executeSql("CREATE TABLE log_table_for_changelog (id INT, name 
STRING)");
+
+        assertThatThrownBy(
+                        () ->
+                                catalog.getTable(
+                                        new ObjectPath(
+                                                DEFAULT_DB, 
"log_table_for_changelog$changelog")))
+                .isInstanceOf(CatalogException.class)
+                .hasRootCauseMessage(
+                        "Virtual $changelog tables are only supported for 
primary key tables. "
+                                + "Table fluss.log_table_for_changelog does 
not have a primary key.");
+    }
+
     /**
      * Before Flink 2.1, the {@link Schema} did not include an index field. 
Starting from Flink 2.1,
      * Flink introduced the concept of an index, and in Fluss, the primary key 
is considered as an
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
new file mode 100644
index 000000000..0135d3349
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.utils.clock.ManualClock;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Integration test for $changelog virtual table functionality. */
+abstract class ChangelogVirtualTableITCase extends AbstractTestBase {
+
+    protected static final ManualClock CLOCK = new ManualClock();
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(new Configuration())
+                    .setNumOfTabletServers(1)
+                    .setClock(CLOCK)
+                    .build();
+
+    static final String CATALOG_NAME = "testcatalog";
+    static final String DEFAULT_DB = "test_changelog_db";
+    protected StreamExecutionEnvironment execEnv;
+    protected StreamTableEnvironment tEnv;
+    protected static Connection conn;
+    protected static Admin admin;
+
+    protected static Configuration clientConf;
+
+    @BeforeAll
+    protected static void beforeAll() {
+        clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+        conn = ConnectionFactory.createConnection(clientConf);
+        admin = conn.getAdmin();
+    }
+
+    @BeforeEach
+    void before() {
+        // Initialize Flink environment
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inStreamingMode());
+
+        // Initialize catalog and database
+        String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+        tEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
+                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
+        tEnv.executeSql("use catalog " + CATALOG_NAME);
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+        tEnv.executeSql("create database " + DEFAULT_DB);
+        tEnv.useDatabase(DEFAULT_DB);
+        // reset clock before each test
+        CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS);
+    }
+
+    @AfterEach
+    void after() {
+        tEnv.useDatabase(BUILTIN_DATABASE);
+        tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
+    }
+
+    /** Deletes rows from a primary key table using the proper delete API. */
+    protected static void deleteRows(
+            Connection connection, TablePath tablePath, List<InternalRow> 
rows) throws Exception {
+        try (Table table = connection.getTable(tablePath)) {
+            UpsertWriter writer = table.newUpsert().createWriter();
+            for (InternalRow row : rows) {
+                writer.delete(row);
+            }
+            writer.flush();
+        }
+    }
+
+    @Test
+    public void testDescribeChangelogTable() throws Exception {
+        // Create a table with various data types to test complex schema
+        tEnv.executeSql(
+                "CREATE TABLE complex_table ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  score DOUBLE,"
+                        + "  is_active BOOLEAN,"
+                        + "  created_date DATE,"
+                        + "  metadata MAP<STRING, STRING>,"
+                        + "  tags ARRAY<STRING>,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ")");
+
+        // Test DESCRIBE on changelog virtual table
+        CloseableIterator<Row> describeResult =
+                tEnv.executeSql("DESCRIBE complex_table$changelog").collect();
+
+        List<String> schemaRows = new ArrayList<>();
+        while (describeResult.hasNext()) {
+            schemaRows.add(describeResult.next().toString());
+        }
+
+        // Should have 3 metadata columns + 7 data columns = 10 total
+        assertThat(schemaRows).hasSize(10);
+
+        // Verify metadata columns are listed first
+        // DESCRIBE format: +I[name, type, null, key, extras, watermark]
+        assertThat(schemaRows.get(0))
+                .isEqualTo("+I[_change_type, STRING, false, null, null, 
null]");
+        assertThat(schemaRows.get(1)).isEqualTo("+I[_log_offset, BIGINT, 
false, null, null, null]");
+        assertThat(schemaRows.get(2))
+                .isEqualTo("+I[_commit_timestamp, TIMESTAMP_LTZ(3), false, 
null, null, null]");
+
+        // Verify data columns maintain their types
+        // Note: Primary key info is not preserved in $changelog virtual table
+        assertThat(schemaRows.get(3)).isEqualTo("+I[id, INT, false, null, 
null, null]");
+        assertThat(schemaRows.get(4)).isEqualTo("+I[name, STRING, true, null, 
null, null]");
+        assertThat(schemaRows.get(5)).isEqualTo("+I[score, DOUBLE, true, null, 
null, null]");
+        assertThat(schemaRows.get(6)).isEqualTo("+I[is_active, BOOLEAN, true, 
null, null, null]");
+        assertThat(schemaRows.get(7)).isEqualTo("+I[created_date, DATE, true, 
null, null, null]");
+        assertThat(schemaRows.get(8))
+                .isEqualTo("+I[metadata, MAP<STRING NOT NULL, STRING>, true, 
null, null, null]");
+        assertThat(schemaRows.get(9)).isEqualTo("+I[tags, ARRAY<STRING>, true, 
null, null, null]");
+
+        // Test SHOW CREATE TABLE on changelog virtual table
+        CloseableIterator<Row> showCreateResult =
+                tEnv.executeSql("SHOW CREATE TABLE 
complex_table$changelog").collect();
+
+        StringBuilder createTableStatement = new StringBuilder();
+        while (showCreateResult.hasNext()) {
+            createTableStatement.append(showCreateResult.next().toString());
+        }
+
+        String createStatement = createTableStatement.toString();
+        // Verify metadata columns are included in the CREATE TABLE statement
+        assertThat(createStatement)
+                .contains(
+                        "CREATE TABLE 
`testcatalog`.`test_changelog_db`.`complex_table$changelog` (\n"
+                                + "  `_change_type` VARCHAR(2147483647) NOT 
NULL,\n"
+                                + "  `_log_offset` BIGINT NOT NULL,\n"
+                                + "  `_commit_timestamp` TIMESTAMP(3) WITH 
LOCAL TIME ZONE NOT NULL,\n"
+                                + "  `id` INT NOT NULL,\n"
+                                + "  `name` VARCHAR(2147483647),\n"
+                                + "  `score` DOUBLE,\n"
+                                + "  `is_active` BOOLEAN,\n"
+                                + "  `created_date` DATE,\n"
+                                + "  `metadata` MAP<VARCHAR(2147483647) NOT 
NULL, VARCHAR(2147483647)>,\n"
+                                + "  `tags` ARRAY<VARCHAR(2147483647)>\n"
+                                // with options contains random properties, 
skip checking
+                                + ")");
+    }
+
+    @Test
+    public void testChangelogVirtualTableWithNonPrimaryKeyTable() {
+        // Create a non-primary key table (log table)
+        tEnv.executeSql(
+                "CREATE TABLE events ("
+                        + "  event_id INT,"
+                        + "  event_type STRING,"
+                        + "  event_time TIMESTAMP"
+                        + ")");
+
+        // Attempt to query changelog virtual table should fail
+        String query = "SELECT * FROM events$changelog";
+
+        // The error message is wrapped in a CatalogException, so we check for 
the root cause
+        assertThatThrownBy(() -> tEnv.executeSql(query).await())
+                .hasRootCauseMessage(
+                        "Virtual $changelog tables are only supported for 
primary key tables. "
+                                + "Table test_changelog_db.events does not 
have a primary key.");
+    }
+
+    @Test
+    public void testProjectionOnChangelogTable() throws Exception {
+        // Create a primary key table with 1 bucket and extra columns to test 
projection
+        tEnv.executeSql(
+                "CREATE TABLE projection_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  amount BIGINT,"
+                        + "  description STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "projection_test");
+
+        // Select only _change_type, id, and name (skip amount and description)
+        String query = "SELECT _change_type, id, name FROM 
projection_test$changelog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        // Test INSERT
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1", 100L, 
"Desc-1")), false);
+        List<String> insertResult = collectRowsWithTimeout(rowIter, 1, false);
+        assertThat(insertResult.get(0)).isEqualTo("+I[+I, 1, Item-1]");
+
+        // Test UPDATE
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(
+                conn,
+                tablePath,
+                Arrays.asList(row(1, "Item-1-Updated", 150L, 
"Desc-1-Updated")),
+                false);
+        List<String> updateResults = collectRowsWithTimeout(rowIter, 2, false);
+        assertThat(updateResults.get(0)).isEqualTo("+I[-U, 1, Item-1]");
+        assertThat(updateResults.get(1)).isEqualTo("+I[+U, 1, 
Item-1-Updated]");
+
+        // Test DELETE
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        deleteRows(
+                conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 150L, 
"Desc-1-Updated")));
+        List<String> deleteResult = collectRowsWithTimeout(rowIter, 1, true);
+        assertThat(deleteResult.get(0)).isEqualTo("+I[-D, 1, Item-1-Updated]");
+    }
+
+    @Test
+    public void testChangelogScanWithAllChangeTypes() throws Exception {
+        // Create a primary key table with 1 bucket for consistent log_offset 
numbers
+        tEnv.executeSql(
+                "CREATE TABLE scan_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  amount BIGINT,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "scan_test");
+
+        // Start changelog scan
+        String query = "SELECT * FROM scan_test$changelog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        // Insert initial data with controlled timestamp
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        List<InternalRow> initialData =
+                Arrays.asList(row(1, "Item-1", 100L), row(2, "Item-2", 200L));
+        writeRows(conn, tablePath, initialData, false);
+
+        // Collect and validate inserts - with 1 bucket, offsets are 
predictable (0, 1)
+        List<String> results = collectRowsWithTimeout(rowIter, 2, false);
+        assertThat(results).hasSize(2);
+
+        // With ManualClock and 1 bucket, we can assert exact row values
+        // Format: +I[_change_type, _log_offset, _commit_timestamp, id, name, 
amount]
+        assertThat(results.get(0)).isEqualTo("+I[+I, 0, 1970-01-01T00:00:01Z, 
1, Item-1, 100]");
+        assertThat(results.get(1)).isEqualTo("+I[+I, 1, 1970-01-01T00:00:01Z, 
2, Item-2, 200]");
+
+        // Test UPDATE operation with new timestamp
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 
150L)), false);
+
+        // Collect update records (should get -U and +U)
+        List<String> updateResults = collectRowsWithTimeout(rowIter, 2, false);
+        assertThat(updateResults).hasSize(2);
+        assertThat(updateResults.get(0))
+                .isEqualTo("+I[-U, 2, 1970-01-01T00:00:02Z, 1, Item-1, 100]");
+        assertThat(updateResults.get(1))
+                .isEqualTo("+I[+U, 3, 1970-01-01T00:00:02Z, 1, Item-1-Updated, 
150]");
+
+        // Test DELETE operation with new timestamp
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        deleteRows(conn, tablePath, Arrays.asList(row(2, "Item-2", 200L)));
+
+        // Collect delete record
+        List<String> deleteResult = collectRowsWithTimeout(rowIter, 1, true);
+        assertThat(deleteResult).hasSize(1);
+        assertThat(deleteResult.get(0))
+                .isEqualTo("+I[-D, 4, 1970-01-01T00:00:03Z, 2, Item-2, 200]");
+    }
+
+    @Test
+    public void testChangelogWithScanStartupMode() throws Exception {
+        // Create a primary key table with 1 bucket for consistent log_offset 
numbers
+        tEnv.executeSql(
+                "CREATE TABLE startup_mode_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "startup_mode_test");
+
+        // Write first batch of data
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        List<InternalRow> batch1 = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
+        writeRows(conn, tablePath, batch1, false);
+
+        // Write second batch of data
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        List<InternalRow> batch2 = Arrays.asList(row(4, "v4"), row(5, "v5"));
+        writeRows(conn, tablePath, batch2, false);
+
+        // 1. Test scan.startup.mode='earliest' - should read all records from 
beginning
+        String optionsEarliest = " /*+ OPTIONS('scan.startup.mode' = 
'earliest') */";
+        String queryEarliest =
+                "SELECT _change_type, id, name FROM 
startup_mode_test$changelog" + optionsEarliest;
+        CloseableIterator<Row> rowIterEarliest = 
tEnv.executeSql(queryEarliest).collect();
+        List<String> earliestResults = collectRowsWithTimeout(rowIterEarliest, 
5, true);
+        assertThat(earliestResults).hasSize(5);
+        // All should be INSERT change types
+        for (String result : earliestResults) {
+            assertThat(result).startsWith("+I[+I,");
+        }
+
+        // 2. Test scan.startup.mode='latest' - should only read new records 
after subscription
+        String optionsLatest = " /*+ OPTIONS('scan.startup.mode' = 'latest') 
*/";
+        String queryLatest =
+                "SELECT _change_type, id, name FROM 
startup_mode_test$changelog" + optionsLatest;
+        CloseableIterator<Row> rowIterLatest = 
tEnv.executeSql(queryLatest).collect();
+
+        // Write new data after subscribing with 'latest'
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(conn, tablePath, Arrays.asList(row(6, "v6")), false);
+        List<String> latestResults = collectRowsWithTimeout(rowIterLatest, 1, 
true);
+        assertThat(latestResults).hasSize(1);
+        assertThat(latestResults.get(0)).isEqualTo("+I[+I, 6, v6]");
+
+        // 3. Test scan.startup.mode='timestamp' - should read records from 
specific timestamp
+        // read between batch1 and batch2
+        String optionsTimestamp =
+                " /*+ OPTIONS('scan.startup.mode' = 'timestamp', 
'scan.startup.timestamp' = '150') */";
+        String queryTimestamp = "SELECT * FROM startup_mode_test$changelog " + 
optionsTimestamp;
+        CloseableIterator<Row> rowIterTimestamp = 
tEnv.executeSql(queryTimestamp).collect();
+        List<String> timestampResults = 
collectRowsWithTimeout(rowIterTimestamp, 2, true);
+        assertThat(timestampResults).hasSize(2);
+        // Should contain records from batch2 only
+        assertThat(timestampResults)
+                .containsExactlyInAnyOrder(
+                        "+I[+I, 3, 1970-01-01T00:00:00.200Z, 4, v4]",
+                        "+I[+I, 4, 1970-01-01T00:00:00.200Z, 5, v5]");
+    }
+
+    @Test
+    public void testChangelogWithPartitionedTable() throws Exception {
+        // Create a partitioned primary key table with 1 bucket per partition
+        tEnv.executeSql(
+                "CREATE TABLE partitioned_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  region STRING NOT NULL,"
+                        + "  PRIMARY KEY (id, region) NOT ENFORCED"
+                        + ") PARTITIONED BY (region) WITH ('bucket.num' = 
'1')");
+
+        // Insert data into different partitions using Flink SQL
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        tEnv.executeSql(
+                        "INSERT INTO partitioned_test VALUES "
+                                + "(1, 'Item-1', 'us'), "
+                                + "(2, 'Item-2', 'us'), "
+                                + "(3, 'Item-3', 'eu')")
+                .await();
+
+        // Query the changelog virtual table for all partitions
+        String query = "SELECT _change_type, id, name, region FROM 
partitioned_test$changelog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        // Collect initial inserts
+        List<String> results = collectRowsWithTimeout(rowIter, 3, false);
+        assertThat(results)
+                .containsExactlyInAnyOrder(
+                        "+I[+I, 1, Item-1, us]", "+I[+I, 2, Item-2, us]", 
"+I[+I, 3, Item-3, eu]");
+
+        // Update a record in a specific partition
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        tEnv.executeSql("INSERT INTO partitioned_test VALUES (1, 
'Item-1-Updated', 'us')").await();
+        List<String> updateResults = collectRowsWithTimeout(rowIter, 2, false);
+        assertThat(updateResults)
+                .containsExactly("+I[-U, 1, Item-1, us]", "+I[+U, 1, 
Item-1-Updated, us]");
+
+        rowIter.close();
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java
new file mode 100644
index 000000000..c569ba86d
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.GenericRecord;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.row.indexed.IndexedRowWriter;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link ChangelogRowConverter}. */
+class ChangelogRowConverterTest {
+
+    private RowType testRowType;
+    private ChangelogRowConverter converter;
+
+    @BeforeEach
+    void setUp() {
+        // Create a simple test table schema: (id INT, name STRING, amount 
BIGINT)
+        testRowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT())
+                        .field("name", DataTypes.STRING())
+                        .field("amount", DataTypes.BIGINT())
+                        .build();
+
+        converter = new ChangelogRowConverter(testRowType);
+    }
+
+    @Test
+    void testConvertInsertRecord() throws Exception {
+        LogRecord record = createLogRecord(ChangeType.INSERT, 100L, 1, 
"Alice", 5000L);
+
+        RowData result = converter.convert(record);
+
+        // Verify row kind
+        assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
+
+        // Verify metadata columns
+        assertThat(result.getString(0)).isEqualTo(StringData.fromString("+I"));
+        assertThat(result.getLong(1)).isEqualTo(100L); // log offset
+        assertThat(result.getTimestamp(2, 3)).isNotNull(); // commit timestamp
+
+        // Verify physical columns
+        assertThat(result.getInt(3)).isEqualTo(1); // id
+        assertThat(result.getString(4).toString()).isEqualTo("Alice"); // name
+        assertThat(result.getLong(5)).isEqualTo(5000L); // amount
+
+        // Verify it's a JoinedRowData
+        assertThat(result).isInstanceOf(JoinedRowData.class);
+    }
+
+    @Test
+    void testConvertUpdateBeforeRecord() throws Exception {
+        LogRecord record = createLogRecord(ChangeType.UPDATE_BEFORE, 200L, 2, 
"Bob", 3000L);
+
+        RowData result = converter.convert(record);
+
+        // Verify row kind (always INSERT for virtual table)
+        assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
+
+        // Verify change type metadata
+        assertThat(result.getString(0)).isEqualTo(StringData.fromString("-U"));
+        assertThat(result.getLong(1)).isEqualTo(200L);
+
+        // Verify physical columns
+        assertThat(result.getInt(3)).isEqualTo(2);
+        assertThat(result.getString(4).toString()).isEqualTo("Bob");
+        assertThat(result.getLong(5)).isEqualTo(3000L);
+    }
+
+    @Test
+    void testConvertUpdateAfterRecord() throws Exception {
+        LogRecord record = createLogRecord(ChangeType.UPDATE_AFTER, 201L, 2, 
"Bob", 4000L);
+
+        RowData result = converter.convert(record);
+
+        assertThat(result.getString(0)).isEqualTo(StringData.fromString("+U"));
+        assertThat(result.getLong(1)).isEqualTo(201L);
+        assertThat(result.getInt(3)).isEqualTo(2);
+        assertThat(result.getString(4).toString()).isEqualTo("Bob");
+        assertThat(result.getLong(5)).isEqualTo(4000L);
+    }
+
+    @Test
+    void testConvertDeleteRecord() throws Exception {
+        LogRecord record = createLogRecord(ChangeType.DELETE, 300L, 3, 
"Charlie", 1000L);
+
+        RowData result = converter.convert(record);
+
+        assertThat(result.getString(0)).isEqualTo(StringData.fromString("-D"));
+        assertThat(result.getLong(1)).isEqualTo(300L);
+        assertThat(result.getInt(3)).isEqualTo(3);
+        assertThat(result.getString(4).toString()).isEqualTo("Charlie");
+        assertThat(result.getLong(5)).isEqualTo(1000L);
+    }
+
+    @Test
+    void testProducedTypeHasMetadataColumns() {
+        org.apache.flink.table.types.logical.RowType producedType = 
converter.getProducedType();
+
+        // Should have 3 metadata columns + 3 physical columns
+        assertThat(producedType.getFieldCount()).isEqualTo(6);
+
+        // Check metadata column names and types
+        assertThat(producedType.getFieldNames())
+                .containsExactly(
+                        "_change_type", "_log_offset", "_commit_timestamp", 
"id", "name", "amount");
+
+        // Check metadata column types
+        assertThat(producedType.getTypeAt(0))
+                
.isInstanceOf(org.apache.flink.table.types.logical.VarCharType.class);
+        assertThat(producedType.getTypeAt(1))
+                
.isInstanceOf(org.apache.flink.table.types.logical.BigIntType.class);
+        assertThat(producedType.getTypeAt(2))
+                
.isInstanceOf(org.apache.flink.table.types.logical.LocalZonedTimestampType.class);
+    }
+
+    @Test
+    void testAllChangeTypes() throws Exception {
+        // Test all change type conversions
+        assertThat(
+                        converter
+                                .convert(createLogRecord(ChangeType.INSERT, 
1L, 1, "Test", 100L))
+                                .getString(0))
+                .isEqualTo(StringData.fromString("+I"));
+
+        assertThat(
+                        converter
+                                .convert(
+                                        createLogRecord(
+                                                ChangeType.UPDATE_BEFORE, 2L, 
1, "Test", 100L))
+                                .getString(0))
+                .isEqualTo(StringData.fromString("-U"));
+
+        assertThat(
+                        converter
+                                .convert(
+                                        createLogRecord(
+                                                ChangeType.UPDATE_AFTER, 3L, 
1, "Test", 100L))
+                                .getString(0))
+                .isEqualTo(StringData.fromString("+U"));
+
+        assertThat(
+                        converter
+                                .convert(createLogRecord(ChangeType.DELETE, 
4L, 1, "Test", 100L))
+                                .getString(0))
+                .isEqualTo(StringData.fromString("-D"));
+
+        // For log tables (append-only)
+        assertThat(
+                        converter
+                                .convert(
+                                        createLogRecord(
+                                                ChangeType.APPEND_ONLY, 5L, 1, 
"Test", 100L))
+                                .getString(0))
+                .isEqualTo(StringData.fromString("+A"));
+    }
+
+    private LogRecord createLogRecord(
+            ChangeType changeType, long offset, int id, String name, long 
amount) throws Exception {
+        // Create an IndexedRow with test data
+        IndexedRow row = new IndexedRow(testRowType.getChildren().toArray(new 
DataType[0]));
+        try (IndexedRowWriter writer =
+                new IndexedRowWriter(testRowType.getChildren().toArray(new 
DataType[0]))) {
+            writer.writeInt(id);
+            writer.writeString(BinaryString.fromString(name));
+            writer.writeLong(amount);
+            writer.complete();
+
+            row.pointTo(writer.segment(), 0, writer.position());
+
+            return new GenericRecord(
+                    offset, // log offset
+                    System.currentTimeMillis(), // timestamp
+                    changeType, // change type
+                    row // row data
+                    );
+        }
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 8aedc194b..96282c1b6 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -53,6 +53,9 @@ import static 
org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS;
 import static org.apache.fluss.config.FlussConfigUtils.isAlterableTableOption;
 import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
 import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.CHANGE_TYPE_COLUMN;
+import static 
org.apache.fluss.metadata.TableDescriptor.COMMIT_TIMESTAMP_COLUMN;
+import static org.apache.fluss.metadata.TableDescriptor.LOG_OFFSET_COLUMN;
 import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
 import static 
org.apache.fluss.utils.PartitionUtils.PARTITION_KEY_SUPPORTED_TYPES;
@@ -66,7 +69,10 @@ public class TableDescriptorValidation {
                             Arrays.asList(
                                     OFFSET_COLUMN_NAME,
                                     TIMESTAMP_COLUMN_NAME,
-                                    BUCKET_COLUMN_NAME)));
+                                    BUCKET_COLUMN_NAME,
+                                    CHANGE_TYPE_COLUMN,
+                                    LOG_OFFSET_COLUMN,
+                                    COMMIT_TIMESTAMP_COLUMN)));
 
     private static final List<DataTypeRoot> KEY_UNSUPPORTED_TYPES =
             Arrays.asList(DataTypeRoot.ARRAY, DataTypeRoot.MAP, 
DataTypeRoot.ROW);

Reply via email to